def gp_search(code):
url="http://qt.gtimg.cn/q="+code
mm.header=["User-Agent: MyCustomAgent/1.0","mam: i am hera"]
c=mm.get(url)
print(c)
return c
from machine import UART
import time
class ML307(object):
def __init__(self,rx,tx,u_num=1):
self.uart=UART(u_num,115200,rx=rx,tx=tx,rxbuf=4096,txbuf=4096)
self.header=[]
time.sleep(3)
def command(self,cmd,t=1):
self.uart.write(cmd+'\r\n')
time.sleep(t*0.1)
return self.uart.read()
def get_response(self,info):
nx=info.split(b'MHTTPURC: "content"')
rep_1=nx[1].split(b",")
rep=b"".join(rep_1[5:])
return rep
def get_url(self,url_1,url_2,t=15):
self.command('AT+MHTTPCREATE="'+url_1+'"')
self.header_def()
info=self.command('AT+MHTTPREQUEST=0,1,0,"'+url_2+'"',t)
self.command('AT+MHTTPHEADER=0')
return self.get_response(info)
def get_url_ssl(self,url_1,url_2,t=15):
self.command('AT+MHTTPCREATE="'+url_1+'"')
self.command('AT+MHTTPCFG="ssl",0,1,1')
self.header_def()
info=self.command('AT+MHTTPREQUEST=0,1,0,"'+url_2+'"',t)
self.command('AT+MHTTPHEADER=0')
return self.get_response(info)
def make_url(self,url):
url_1="/".join(url.split("/")[0:3])
url_2="/"+"/".join(url.split("/")[3:])
return [url_1,url_2]
def get(self,url):
ua=self.make_url(url)
try:
if "https" in ua[0]:
ba=self.get_url_ssl(ua[0],ua[1])
else:
ba=self.get_url(ua[0],ua[1])
return ba
except Exception as e:
return False
def post_form(self,info):
form=""
for key in info:
form=form+key+"="+str(info[key])+"&"
return form
def post_url(self,url_1,url_2,form_str,t=15):
self.command('AT+MHTTPCREATE="'+url_1+'"')
self.header_def()
self.command('AT+MHTTPHEADER=0,0,0,"Content-Type: application/x-www-form-urlencoded"')
self.command('AT+MHTTPCONTENT=0,0,0,"'+form_str+'"')
info=self.command('AT+MHTTPREQUEST=0,2,0,"'+url_2+'"',t)
self.command('AT+MHTTPHEADER=0')
return self.get_response(info)
def post_url_ssl(self,url_1,url_2,form_str,t=15):
self.command('AT+MHTTPCREATE="'+url_1+'"')
self.command('AT+MHTTPCFG="ssl",0,1,1')
self.header_def()
self.command('AT+MHTTPHEADER=0,0,0,"Content-Type: application/x-www-form-urlencoded"')
self.command('AT+MHTTPCONTENT=0,0,0,"'+form_str+'"')
info=self.command('AT+MHTTPREQUEST=0,2,0,"'+url_2+'"',t)
self.command('AT+MHTTPHEADER=0')
return self.get_response(info)
def post(self,url,dic_info):
form_str=self.post_form(dic_info)
ua=self.make_url(url)
try:
if "https" in ua[0]:
ba=self.post_url_ssl(ua[0],ua[1],form_str)
else:
ba=self.post_url(ua[0],ua[1],form_str)
return ba
except Exception as e:
return False
def header_def(self):
lh=len(self.header)
if lh>1:
for i in range(lh):
if i <lh-1:
self.command('AT+MHTTPHEADER=0,1,0,"'+self.header[i]+'"')
else:
self.command('AT+MHTTPHEADER=0,0,0,"'+self.header[i]+'"')
elif lh==1:
self.command('AT+MHTTPHEADER=0,0,0,"'+self.header[0]+'"')
else:
pass
def gp_data(raw_data, target_stocks=None):
"""
解析股票数据,支持目标股票列表兜底,避免漏股
:param raw_data: ML307返回的原始字节数据
:param target_stocks: 目标股票列表(如["sz000858","sh600519"...]),防止漏股
:return: 完整股票字典(5只全保留)
"""
# 步骤1:数据有效性校验+初始化结果(兜底目标股票,防止漏股)
stock_result = {}
if target_stocks:
for code in target_stocks:
stock_result[code] = {
'code': code,
'current_price': '0.00',
'open_price': '0.00',
'turnover_rate': '0.00'
}
if not raw_data or len(raw_data) == 0:
return stock_result
# 步骤2:清洗无效字节(\r\n+ 全去掉,兼容接口截断)
clean_data = raw_data.replace(b'\r', b'').replace(b'\n', b'').replace(b'+', b'')
# 步骤3:拆分多只股票(按;分割,兼容末尾残缺记录)
stock_records = clean_data.split(b';')
# 步骤4:逐只解析(按你的原始数据,换手率精准索引38!)
for record in stock_records:
if not record or b'v_' not in record or b'=' not in record:
continue
# 拆分股票代码和字段串(字节操作,无编码报错)
code_part, field_part = record.split(b'=', 1)
stock_code = code_part.replace(b'v_', b'').decode('ascii')
# 清洗字段串+按~拆分
field_str = field_part.strip(b'"')
fields = field_str.split(b'~')
# 步骤5:精准提取(逐字节核对你的原始数据,索引固定!)
# 核心:换手率=索引38,当前价=3,开盘价=4,字段够39个才取,否则用默认值
current_price = fields[3].decode('ascii') if len(fields)>=4 else '0.00'
open_price = fields[4].decode('ascii') if len(fields)>=5 else '0.00'
turnover_rate = fields[38].decode('ascii') if len(fields)>=39 else '0.00'
# 更新结果(覆盖兜底默认值)
stock_result[stock_code] = {
'code': stock_code,
'current_price': current_price,
'open_price': open_price,
'turnover_rate': turnover_rate
}
return stock_result
HTML页面处理
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="/static/jquery.js"></script>
<link rel="stylesheet" href="/static/bootstrap.css" crossorigin="anonymous">
<title>股票监控管理页面</title>
</head>
<body>
<div class="container ">
<div class="row">
<div class="jumbotron">
<h1>股票监控管理页面</h1>
<p>最大显示股票信息为10条。</p>
</div>
</div>
<br/>
<div class="row">
<div class="col-md-8 col-md-offset-2">
<div class="input-group input-group-lg">
<div class="input-group-addon">
<span>
输入代码
</span>
</div>
<input type="number" id="code" class="form-control">
<div class="input-group-btn">
<button class="btn btn-info" id="btn_gp">查询</button>
</div>
</div>
</div>
</div>
<br/>
<div class="row">
<div class="col-md-4">
<h2 class="text-info">查询到的股票:</h2>
</div>
<div class="show-info col-md-8">
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-4">
<h2 class="text-success">已保存的股票代码:</h2>
</div>
<ol class="added col-md-8">
</ol>
</div>
</div>
<script>
function add_one(mx){
var div=$("<li><h3>"+mx[2]+" <small> 代码"+mx[1]+"</small> </h3></li>");
var button=$("<button class='btn btn-danger' onclick=del_code('"+mx[0]+"','"+mx[1]+"','"+mx[2]+"')>删除</button>");
div.append(button);
$(".added").append(div);
// ty code name
}
// AJAX 1
function add_code(ty,code,name){
// 添加股票
console.log(ty,code,name);
url="/add";
// 添加网址 一个回调传入信息
$.ajax({
type:"POST",
url:url,
async:false,
data:{
ty:ty,
code:code,
name:name
},
success:function(data){
if(data.status){
console.log("添加完成");
get_all();//刷新
$(".show-info").html("");
}
}
})
}
// AJAX 2
function del_code(ty,code,name){
// 删除股票
console.log(ty,code,name);
url="/del";
// 添加网址 一个回调传入信息
$.ajax({
type:"POST",
url:url,
async:false,
data:{
ty:ty,
code:code,
name:name
},
success:function(data){
if(data.status){
console.log("删除完成");
get_all();//刷新
}
}
})
}
// AJAX 3
function get_all(){
$(".added").html("")
// 现有股票 URL
url="/all";
$.ajax({
type:"POST",
url:url,
async:false,
success:function(data){
console.log("查询完成");
if(data.status){
for (const key in data.info) {
add_one(data.info[key])
}
}
}
})
}
// AJAX 4
function fgp(cod='600111',ty='sz'){
var url="/gp?code="+ty+cod
var name=''
var price=''
var code=''
$.ajax({
type:"GET",
url:url,
async:false,
success:function(data){
e=data.split("~");
if(e[1]){
name=e[1];
code=e[2];
price=e[3];
}
}
})
if(name){
var mk=[name,ty,code,price]
return mk
}else{
return []
}
}
function add_btn(mx){
var div=$("<h3>"+mx[0]+" <small> 代码"+mx[2]+"股价:"+mx[3]+" </small> </h3>");
var button=$("<button class='btn btn-primary' onclick=add_code('"+mx[1]+"','"+mx[2]+"','"+mx[0]+"')>添加</button>");
div.append(button);
$(".show-info").append(div);
}
$("#btn_gp").click(function(){
var code=$("#code").val();
if(code.length==6){
$(".show-info").html("");
var jugg=code[0];
if(jugg==0 || jugg==2|| jugg==3){
var sz=fgp(code,"sz");
if(sz.length>1){
add_btn(sz);
}else{
alert("未查询到相关信息");
}
}else if(jugg==6 || jugg==9){
var sh=fgp(code,"sh");
if(sh.length>1){
add_btn(sh);
}else{
alert("未查询到相关信息");
}
}else if(jugg==4 || jugg==8){
var bj=fgp(code,"bj");
if(bj.length>1){
add_btn(bj);
}else{
alert("未查询到相关信息");
}
}else{
alert("未查询到相关信息");
}
$("#code").val("");
}else{
alert("股票代码为6位");
}
})
get_all();
// 执行查询
</script>
</body>
</html>