股票查询

def gp_search(code):
    url="http://qt.gtimg.cn/q="+code
    mm.header=["User-Agent: MyCustomAgent/1.0","mam: i am hera"]
    c=mm.get(url)
    print(c)
    return c


from machine import UART
import time

class ML307(object):
    def __init__(self,rx,tx,u_num=1):
        self.uart=UART(u_num,115200,rx=rx,tx=tx,rxbuf=4096,txbuf=4096)
        self.header=[]
        time.sleep(3)
    def command(self,cmd,t=1):
        self.uart.write(cmd+'\r\n')
        time.sleep(t*0.1)
        return self.uart.read()
    def get_response(self,info):
        nx=info.split(b'MHTTPURC: "content"')
        rep_1=nx[1].split(b",")
        rep=b"".join(rep_1[5:])
        return rep
    def get_url(self,url_1,url_2,t=15):
        self.command('AT+MHTTPCREATE="'+url_1+'"')
        self.header_def()
        info=self.command('AT+MHTTPREQUEST=0,1,0,"'+url_2+'"',t)
        self.command('AT+MHTTPHEADER=0')
        return self.get_response(info)
    def get_url_ssl(self,url_1,url_2,t=15):
        self.command('AT+MHTTPCREATE="'+url_1+'"')
        self.command('AT+MHTTPCFG="ssl",0,1,1')
        self.header_def()
        info=self.command('AT+MHTTPREQUEST=0,1,0,"'+url_2+'"',t)
        self.command('AT+MHTTPHEADER=0')
        return self.get_response(info)
    def make_url(self,url):
        url_1="/".join(url.split("/")[0:3])
        url_2="/"+"/".join(url.split("/")[3:])
        return [url_1,url_2]
    def get(self,url):
        ua=self.make_url(url)
        try:
            if "https" in ua[0]:
                ba=self.get_url_ssl(ua[0],ua[1])
            else:
                ba=self.get_url(ua[0],ua[1])
            return ba
        except Exception as e:
            return False
    def post_form(self,info):
        form=""
        for key in info:
            form=form+key+"="+str(info[key])+"&"
        return form
    def post_url(self,url_1,url_2,form_str,t=15):
        self.command('AT+MHTTPCREATE="'+url_1+'"')
        self.header_def()
        self.command('AT+MHTTPHEADER=0,0,0,"Content-Type: application/x-www-form-urlencoded"')
        self.command('AT+MHTTPCONTENT=0,0,0,"'+form_str+'"')
        info=self.command('AT+MHTTPREQUEST=0,2,0,"'+url_2+'"',t)
        self.command('AT+MHTTPHEADER=0')
        return self.get_response(info)
    def post_url_ssl(self,url_1,url_2,form_str,t=15):
        self.command('AT+MHTTPCREATE="'+url_1+'"')
        self.command('AT+MHTTPCFG="ssl",0,1,1')
        self.header_def()
        self.command('AT+MHTTPHEADER=0,0,0,"Content-Type: application/x-www-form-urlencoded"')
        self.command('AT+MHTTPCONTENT=0,0,0,"'+form_str+'"')
        info=self.command('AT+MHTTPREQUEST=0,2,0,"'+url_2+'"',t)
        self.command('AT+MHTTPHEADER=0')
        return self.get_response(info)
    def post(self,url,dic_info):
        form_str=self.post_form(dic_info)
        ua=self.make_url(url)
        try:
            if "https" in ua[0]:
                ba=self.post_url_ssl(ua[0],ua[1],form_str)
            else:
                ba=self.post_url(ua[0],ua[1],form_str)
            return ba
        except Exception as e:
            return False
    def header_def(self):
        lh=len(self.header)
        if lh>1:
            for i in range(lh):
                if i <lh-1:
                    self.command('AT+MHTTPHEADER=0,1,0,"'+self.header[i]+'"')
                else:
                    self.command('AT+MHTTPHEADER=0,0,0,"'+self.header[i]+'"')
        elif lh==1:
            self.command('AT+MHTTPHEADER=0,0,0,"'+self.header[0]+'"')
        else:
            pass
def gp_data(raw_data, target_stocks=None):
    """
    解析股票数据,支持目标股票列表兜底,避免漏股
    :param raw_data: ML307返回的原始字节数据
    :param target_stocks: 目标股票列表(如["sz000858","sh600519"...]),防止漏股
    :return: 完整股票字典(5只全保留)
    """
    # 步骤1:数据有效性校验+初始化结果(兜底目标股票,防止漏股)
    stock_result = {}
    if target_stocks:
        for code in target_stocks:
            stock_result[code] = {
                'code': code,
                'current_price': '0.00',
                'open_price': '0.00',
                'turnover_rate': '0.00'
            }
    
    if not raw_data or len(raw_data) == 0:
        return stock_result
    
    # 步骤2:清洗无效字节(\r\n+ 全去掉,兼容接口截断)
    clean_data = raw_data.replace(b'\r', b'').replace(b'\n', b'').replace(b'+', b'')
    
    # 步骤3:拆分多只股票(按;分割,兼容末尾残缺记录)
    stock_records = clean_data.split(b';')
    
    # 步骤4:逐只解析(按你的原始数据,换手率精准索引38!)
    for record in stock_records:
        if not record or b'v_' not in record or b'=' not in record:
            continue
        
        # 拆分股票代码和字段串(字节操作,无编码报错)
        code_part, field_part = record.split(b'=', 1)
        stock_code = code_part.replace(b'v_', b'').decode('ascii')
        
        # 清洗字段串+按~拆分
        field_str = field_part.strip(b'"')
        fields = field_str.split(b'~')
        
        # 步骤5:精准提取(逐字节核对你的原始数据,索引固定!)
        # 核心:换手率=索引38,当前价=3,开盘价=4,字段够39个才取,否则用默认值
        current_price = fields[3].decode('ascii') if len(fields)>=4 else '0.00'
        open_price = fields[4].decode('ascii') if len(fields)>=5 else '0.00'
        turnover_rate = fields[38].decode('ascii') if len(fields)>=39 else '0.00'
        
        # 更新结果(覆盖兜底默认值)
        stock_result[stock_code] = {
            'code': stock_code,
            'current_price': current_price,
            'open_price': open_price,
            'turnover_rate': turnover_rate
        }
    
    return stock_result
此条目发表在None分类目录。将固定链接加入收藏夹。

发表回复