one
import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import copy
import warnings
warnings.filterwarnings("ignore") # 关掉烦人的警告
# 数据库配置
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "db",
"charset": "utf8mb4"
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
def get_info(mcode):
sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
df = pd.read_sql(sql, conn, params=(mcode,))
return df
# ===================== ✅ 极速插入:INSERT IGNORE =====================
def df_to_datalist(df):
insert_list = []
for _, row in df.iterrows():
data = (
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
)
insert_list.append(data)
return insert_list
def insert_data(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
, `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
, `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def insert_datan(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`,`is_now`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def do_coden(exchange, code):
start = time.time()
dlx = gp.get_today(exchange, code)
# print(dlx)
if dlx:
row=dlx[0]
dx=[
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'],int(time.time())
]
# print(dx)
insert_count, skip_count = insert_datan([dx])
print(f"股票 {code} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")
def insert_all(info,code_str):
start=time.time()
if info:
dx=[]
for m in info:
row=m[0]
cv=copy.deepcopy(row)
cv['mcode']=row['code'].strip()[4:10]
cv['code']=row['code'].strip()[2:4]+"."+row['code'].strip()[4:10]
df_b=pd.DataFrame([cv])
df_a=get_info(cv['mcode'])
df=pd.concat([df_a,df_b],ignore_index=True)
df=gp.quant(df)
df = df.replace([float('inf'), -float('inf')], 0)
df = df.fillna(0)
data_list = df_to_datalist(df)
insert_count, skip_count = insert_data(data_list[-1:])
print(insert_count, skip_count)
dx.append([
row['code'].strip()[4:10], row['code'].strip()[2:4]+"."+row['code'].strip()[4:10], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'],int(time.time())
])
insert_count, skip_count = insert_datan(dx)
print(f"股票 {code_str} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")
gp=GP(False)
# # info=gp.get_today("sh", "600000")
# # print(info)
# do_coden("sh", "600000")
df = pd.read_csv('all.csv', dtype={'code': str})
i=0
code_str=''
for index, row in df.iterrows():
c=time.time()
if i<25:
code_str=row['exchange']+row['code']+","+code_str
i=i+1
else:
print(code_str)
ccc=gp.gp_all(code_str)
i=0
code_str=''
insert_all(ccc,code_str)
x=4-(time.time()-c)
if x>0:
time.sleep(x)
ccc=gp.gp_all(code_str)
insert_all(ccc,code_str)
tquant
import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import json
from rules import rules
import warnings
warnings.filterwarnings("ignore") # 关掉烦人的警告
gp=GP(False)
# 数据库配置
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "db",
"charset": "utf8mb4"
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
def df_to_datalist(df):
insert_list = []
for _, row in df.iterrows():
data = (
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
)
insert_list.append(data)
return insert_list
def insert_data(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
, `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
, `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def get_info(mcode):
sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
df = pd.read_sql(sql, conn, params=(mcode,))
return df
def get_info_t(mcode):
# 读取腾讯最后一行
sql = "SELECT * FROM cmf_quantn WHERE mcode = %s ORDER BY date DESC LIMIT 1"
df = pd.read_sql(sql, conn, params=(mcode,))
return df
def get_score(df):
if len(df)>90:
score=0
date=df.iloc[-1]['date']
score_text=[]
code=df.iloc[-1]['code']
for ru in rules:
if ru['check'](df):
score=score+ru['score']
score_text.append(ru['name'])
return [
df.iloc[-1]['id'],
code,
date,
score,
score_text,
]
return False
def update_score(info):
if info:
# 1. 创建连接
# conn = pymysql.connect(**config)
# cursor = conn.cursor()
try:
sql="UPDATE cmf_quantn SET score=%s,score_text=%s WHERE id=%s"
cursor.execute(sql, [info[3],json.dumps(info[4],ensure_ascii=False),info[0]])
conn.commit()
return True
except Exception as e:
conn.rollback() # 失败自动回滚
return False
# finally:
# cursor.close()
# conn.close()
return False
dfc = pd.read_csv('all.csv', dtype={'code': str})
for index, row in dfc.iterrows():
code=row['code']
c=time.time()
df=get_info(code)
df_t=get_info_t(code)
# 腾讯
if (not df.empty) and (not df_t.empty):
dfc=pd.concat([df,df_t],ignore_index=True)
num=len(dfc)
if num>90:
# #最新评分
dfc = gp.quant(dfc)
# print(dfc.iloc[-1])
# break
score_list=get_score(dfc)
print(score_list)
if score_list:
update_score(score_list)
data_list = df_to_datalist(df)
insert_count, skip_count = insert_data(data_list[-1:])
print(code,insert_count, skip_count)
break
# print(score_list[1],time.time()-c)
# #全量评分
# while len(df) >90:
# score_list=get_score(df)
# if score_list:
# update_score(score_list)
# df=df.iloc[:-1]
# print(time.time()-c)
cursor.close()
conn.close()