import pandas as pd
import numpy as np
import baostock as bs
import requests
from datetime import datetime, timedelta
import time
# import talib as ta
#使用quant()函数替换
class GP:
def __init__(self,bs=True):
if bs:
self.login()
def login(self):
bs.login()
def logout(self):
bs.logout()
def get_history(self,gp_type,code,end_date=False,start_date=False,days=14):
if not end_date:
end_date=datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date=(datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days)).strftime("%Y-%m-%d")
rs = bs.query_history_k_data_plus(
code=gp_type+"."+str(code),
fields="code,date,open,high,low,close,preclose,volume,pctChg,turn,isST,adjustflag", # 要获取的字段(手动指定,Akshare 自动返回全字段)
start_date=start_date, # 日期格式:必须带横杠(Akshare 可无)
end_date=end_date,
frequency="d", # 周期:d=日线(和 Akshare 的 period="daily" 对应)
adjustflag="2" # 复权:2=前复权(对应 Akshare 的 adjust="qfq")
)
# 3. 转为DataFrame(核心步骤)
df = rs.get_data()
df = df.replace("", 0) # 把所有空字符串变成 0
if df['preclose'].iloc[-1]=="":
df.drop(df.index[-1],inplace=True)
# 4. 数据类型转换(避免数值以字符串显示)
df = df.astype({
"code":str,
"open": float,
"high": float,
"low": float,
"close": float,
"preclose": float,
"volume": int,
"pctChg":float,
"turn":float,
"isST":int,
"adjustflag": int # 复权标识转整数
})
return df
#股票代码数据
def gp_code(self,code):
str_code=str(code)
if str_code[0] in ['0','1','2','3']:
gp_type='sz'
elif str_code[0] in ['5','6']:
gp_type='sh'
elif str_code[0] in ['4','8','9']:
gp_type='bj'
else:
return False
return [gp_type,str_code]
#当日股票
def get_today(self,gp_type,code):
str_code=str(code)
url="https://qt.gtimg.cn/q="
url=url+gp_type+str_code
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
res=requests.get(url,headers=headers)
if res.status_code==200:
if len(res.content)>30:
reb = res.content.decode("gbk")
reb = reb.replace("~~~", "~").replace("~~", "~")
vsx = reb.split('~')
return [
{
"code":gp_type+"."+str_code,
"date":datetime.now().strftime("%Y-%m-%d"),
"open": float(vsx[5]),
"high": float(vsx[32]),
"low": float(vsx[33]),
"close": float(vsx[3]),
"preclose": float(vsx[4]),
"volume": float(vsx[6])*100,
"pctChg":float(vsx[31]),
"turn":float(vsx[37]),
"isST":0, #默认不返回ST
"adjustflag": 2 # 复权标识转整数 统一baostock
},
{
"name":vsx[1],
'money':vsx[42]
}
]
return False
def gupiao(self,gp_type,gp_code,days=180):
#获得最近180天的交易 包括今天 解决baostock 没有今日数据
if "9:30"<=datetime.now().strftime('%H:%M')<"17:30" and datetime.now().weekday() in [0,1,2,3,4]:
ma=self.get_today(gp_type,gp_code)
if ma:
if int(ma[0]['volume']) != 0:
df=self.get_history(gp_type,gp_code,days=days)
df = pd.concat([df, pd.DataFrame([ma[0]])], ignore_index=True)
return df
return False
else:
print("user 17:30",gp_type,gp_code)
return self.get_history(gp_type,gp_code,days=days)
def gp_all(self,xstr_code):
# str_code=sh600000,sz000301,sz300010
url="https://qt.gtimg.cn/q="
url=url+xstr_code
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
}
res=requests.get(url,headers=headers)
if res.status_code==200:
if len(res.content)>30:
res_c=res.content.decode("gbk")
re_vax=res_c.split(';')
re_a=[]
for oneg in re_vax:
if len(oneg)>30:
oneg = oneg.replace("~~~", "~").replace("~~", "~")
vsx = oneg.split('~')
re_a.append([
{
"code":vsx[0],
"date":datetime.now().strftime("%Y-%m-%d"),
"open": float(vsx[5]),
"high": float(vsx[32]),
"low": float(vsx[33]),
"close": float(vsx[3]),
"preclose": float(vsx[4]),
"volume": float(vsx[6])*100,
"pctChg":float(vsx[31]),
"turn":float(vsx[37]),
"isST":0, #默认不返回ST
"adjustflag": 2 # 复权标识转整数 统一baostock
},
{
"name":vsx[1],
'money':vsx[42]
}
])
return re_a
return False
def quant(self,df):
#处理量化指标
#VOL MA
df['vol_ma5']=df["volume"].rolling(window=5).mean()
df['vol_ma10']=df["volume"].rolling(window=10).mean()
df['vol_ma20']=df["volume"].rolling(window=20).mean()
df['vol_ma60']=df["volume"].rolling(window=60).mean()
#MA
df['ma5']=df["close"].rolling(window=5).mean()
df['ma10']=df["close"].rolling(window=10).mean()
df['ma20']=df["close"].rolling(window=20).mean()
df['ma60']=df["close"].rolling(window=60).mean()
#ATR
df['hl']=df['high']-df['low']
df['hc']=abs(df['high']-df['close'].shift(1))
df['lc']=abs(df['low']-df['close'].shift(1))
df['tr']=df[['hl','hc','lc']].abs().max(axis=1) #axis=1行计算
df['atr'] = df['tr'].ewm(span=14, adjust=False).mean()
#RSI 14 6
df['delta']=df['close'].diff()
df['gain']=df['delta'].where(df['delta']>0,0)
df['loss']=-df['delta'].where(df['delta']<0,0)
df['avg_gain(14)']=df['gain'].rolling(14).mean()
df['avg_loss(14)']=df['loss'].rolling(14).mean()
df['avg_loss(14)'] = df['avg_loss(14)'].replace(0, 0.01)
df['rs_14']=df['avg_gain(14)']/df['avg_loss(14)']
df['rs_14'] = df['rs_14'].clip(0, 100) # 限制在0-100
df['rsi_14']=100-(100/(1+df['rs_14']))
df['avg_gain(6)']=df['gain'].rolling(6).mean()
df['avg_loss(6)']=df['loss'].rolling(6).mean()
df['avg_loss(6)'] = df['avg_loss(6)'].replace(0, 0.01)
df['rs_6']=df['avg_gain(6)']/df['avg_loss(6)']
df['rs_6'] = df['rs_6'].clip(0, 100) # 限制在0-100
df['rsi_6']=100-(100/(1+df['rs_6']))
#成交 MA5
df['money_ma5']=df['ma5']*df['vol_ma5']
#阳 1 阴 0
df['status']=np.where((df['close']-df['open'])>0,1,0)
#价pp
df['pp']=df['close']-df['close'].shift(1)
#量kk
df['kk']=df['volume']/df['vol_ma5']
# ==================== 【新增】MACD 标准公式 ====================
# 短期EMA(12)、长期EMA(26)、DIF、DEA(9)、MACD柱
df['ema12'] = df['close'].ewm(span=12, adjust=False).mean()
df['ema26'] = df['close'].ewm(span=26, adjust=False).mean()
df['dif'] = df['ema12'] - df['ema26'] # 快线
df['dea'] = df['dif'].ewm(span=9, adjust=False).mean() # 慢线
df['macd'] = 2 * (df['dif'] - df['dea']) # MACD柱
# ==================== 【新增】KDJ 标准公式 ====================
# ==================== 【修复 1】KDJ 分母为 0 问题 ====================
low_list = df['low'].rolling(9, min_periods=9).min()
high_list = df['high'].rolling(9, min_periods=9).max()
# 安全除法:分母为0时直接=0,永不爆炸
df['rsv'] = np.where(
high_list == low_list,
50,
(df['close'] - low_list) / (high_list - low_list) * 100
)
df['k'] = df['rsv'].ewm(com=2, adjust=False).mean()
df['d'] = df['k'].ewm(com=2, adjust=False).mean()
df['j'] = 3 * df['k'] - 2 * df['d']
# ==================== 【修复 3】清理所有异常值 ====================
df = df.replace([np.inf, -np.inf], np.nan)
df = df.fillna(0) # J=3K-2D
drop_cols = [
'hl', 'hc', 'lc', 'delta', 'gain', 'loss',
'avg_gain(14)', 'avg_loss(14)', 'avg_gain(6)', 'avg_loss(6)'
]
df = df.drop(columns=drop_cols, errors='ignore')
return df
#1 10收盘价接近10日最高点 10
# df.iloc[-1]['close']>(8*(df['high'].iloc[-10:].max()-df['high'].iloc[-10:].min())/10+df['high'].iloc[-10:].min())
# #2 收盘价大于MA5 10
# df.iloc[-1]['close']>df.iloc[-1]['ma5']
# #3 今日MA5>昨日MA5 10
# df.iloc[-1]['ma5']>df.iloc[-2]['ma5']
# #4 今日ATR在最近20个交易日内ATR排序 10
# atrr=(df['atr'].iloc[-20:]<df['atr'].iloc[-1]).sum()/20
# atrr>0.2 and atrr<0.8
# #5 平均成交额大于2亿 10
# df.iloc[-1]['money_ma5']>200000000
# #6 RSI 6小于70 10
# df.iloc[-1]['rsi(6)']<70
# df.iloc[-1]['rsi(6)']>30
# #7 收盘价大于MA20 10
# df.iloc[-1]['close']>df.iloc[-1]['ma20']
# #8 今日MA20>昨日MA20 10
# df.iloc[-1]['ma20']>df.iloc[-2]['ma20']
# #9 20日新高点 10
# df.iloc[-1]['high'] > df['high'].iloc[-20:-1].max()
# #10 量价指标 -100 -10 5 15
# p=df.iloc[-1]['close']-df.iloc[-2]['close']
# v=df.iloc[-1]['volume']/df['volume'].iloc[-20:].mean()
#p>0 2>v>1.5 +15
#p>0 v>1 +5
#p>0 v<1 -10 缩量
#p<0 v>2 -100 恐慌抛售
#p<=0 v<1 0 横盘
def score_def(df):
score=0
score_list=[]
atrr=(df['atr'].iloc[-20:]<df['atr'].iloc[-1]).sum()/20
p=df.iloc[-1]['close']-df.iloc[-2]['close']
v=df.iloc[-1]['volume']/df['volume'].iloc[-20:].mean()
if df.iloc[-1]['close']>(8*(df['high'].iloc[-10:].max()-df['high'].iloc[-10:].min())/10+df['high'].iloc[-10:].min()):
score=score+10
score_list.append("收盘价接近10日最高点 +10")
if df.iloc[-1]['close']>df.iloc[-1]['ma5']:
score=score+10
score_list.append("收盘价大于MA5 +10")
if df.iloc[-1]['ma5']>df.iloc[-2]['ma5']:
score=score+10
score_list.append("今日MA5>昨日MA5 +10")
if atrr>0.2 and atrr<0.8:
score=score+10
score_list.append("ATR在20日区间位置位于中部 +10")
if df.iloc[-1]['money_ma5']>200000000:
score=score+10
score_list.append("平均成交额大于2亿 +10")
if df.iloc[-1]['rsi(6)']<70 and df.iloc[-1]['rsi(6)']>30:
score=score+10
score_list.append("RSI6小于70大于30 +10")
if df.iloc[-1]['close']>df.iloc[-1]['ma20']:
score=score+10
score_list.append("收盘价大于MA20 +10")
if df.iloc[-1]['ma20']>df.iloc[-2]['ma20']:
score=score+10
score_list.append("今日MA20>昨日MA20 +10")
if df.iloc[-1]['high'] > df['high'].iloc[-20:-1].max():
score=score+10
score_list.append("20日新高点 +10")
if p > 0: # 上涨
if v > 1.5:
score += 15
score_list.append("健康上涨 +15")
elif v >= 1: # 1 <= v <= 1.5
score += 5
score_list.append("温和放量 +5")
else: # v < 1
score -= 10
score_list.append("缩量背离 -10")
else: # p <= 0 下跌或平盘
if v >= 2:
score -= 20
score_list.append("恐慌下跌 -20")
elif v >= 1: # 1 <= v < 2
score -= 10
score_list.append("放量下跌 -10")
else: # v < 1
score += 0
score_list.append("缩量整理 0")
return [score,score_list,df.iloc[-1].to_dict(),df.iloc[-5:].to_dict('records')]
# gp=GP()
# co=gp.gp_code('601515')
# df=gp.gupiao(co[0],co[1],days=60)
# gp.logout()
# df=gp.quant(df)
# for i in range(10):
# print(score_def(df))
# df=df.drop(df.index[-1])
腾讯接口
import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
# 数据库配置
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "db",
"charset": "utf8mb4"
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
# ===================== ✅ 极速插入:INSERT IGNORE =====================
def insert_datan(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`,`is_now`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def do_coden(exchange, code):
start = time.time()
dlx = gp.get_today(exchange, code)
# print(dlx)
if dlx:
row=dlx[0]
dx=[
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'],int(time.time())
]
# print(dx)
insert_count, skip_count = insert_datan([dx])
print(f"股票 {code} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")
def insert_all(info,code_str):
start=time.time()
if info:
dx=[]
for m in info:
row=m[0]
dx.append([
row['code'].strip()[4:10], row['code'].strip()[2:4]+"."+row['code'].strip()[4:10], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'],int(time.time())
])
insert_count, skip_count = insert_datan(dx)
print(f"股票 {code_str} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")
gp=GP(False)
# # info=gp.get_today("sh", "600000")
# # print(info)
# do_coden("sh", "600000")
df = pd.read_csv('all.csv', dtype={'code': str})
i=0
code_str=''
for index, row in df.iterrows():
c=time.time()
if i<25:
code_str=row['exchange']+row['code']+","+code_str
i=i+1
else:
print(code_str)
ccc=gp.gp_all(code_str)
i=0
code_str=''
insert_all(ccc,code_str)
x=4-(time.time()-c)
if x>0:
time.sleep(x)
ccc=gp.gp_all(code_str)
insert_all(ccc,code_str)