one tquant

one

import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import copy
import warnings
warnings.filterwarnings("ignore")  # 关掉烦人的警告

# 数据库配置
config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "123456",
    "database": "db",
    "charset": "utf8mb4"
}

conn = pymysql.connect(**config)
cursor = conn.cursor()



def get_info(mcode):
    sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
    df = pd.read_sql(sql, conn, params=(mcode,))
    return df


# ===================== ✅ 极速插入:INSERT IGNORE =====================


def df_to_datalist(df):
    insert_list = []
    for _, row in df.iterrows():
        data = (
            row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
            row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
            row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
            row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
        )
        insert_list.append(data)
    return insert_list

def insert_data(data_list):
    sql = """
        INSERT IGNORE INTO `cmf_quantn` 
        ( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
        , `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
        , `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
        , `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
        ) 
        VALUES 
        (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ) """
    
    # 直接批量插入,重复自动跳过!
    cursor.executemany(sql, data_list)
    conn.commit()

    # affected rows 就是插入成功的数量
    insert_count = cursor.rowcount
    skip_count = len(data_list) - insert_count
    return insert_count, skip_count

def insert_datan(data_list):
    sql = """
        INSERT IGNORE INTO `cmf_quantn` 
        ( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
        , `turn`, `isST`, `adjustflag`,`is_now`
        ) 
        VALUES 
        (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s
        ) """
    
    # 直接批量插入,重复自动跳过!
    cursor.executemany(sql, data_list)
    conn.commit()

    # affected rows 就是插入成功的数量
    insert_count = cursor.rowcount
    skip_count = len(data_list) - insert_count
    return insert_count, skip_count



def do_coden(exchange, code):
    start = time.time()
    dlx = gp.get_today(exchange, code)
    # print(dlx)
    if dlx:
        row=dlx[0]
        dx=[
            row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
            row['turn'], row['isST'], row['adjustflag'],int(time.time())
        ]
        # print(dx)
        insert_count, skip_count = insert_datan([dx])
        
        print(f"股票 {code} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")

def insert_all(info,code_str):
    start=time.time()
    if info:
        dx=[]
        for m in info:
            row=m[0]
            cv=copy.deepcopy(row)
            cv['mcode']=row['code'].strip()[4:10]
            cv['code']=row['code'].strip()[2:4]+"."+row['code'].strip()[4:10]
            df_b=pd.DataFrame([cv])
            df_a=get_info(cv['mcode'])
            df=pd.concat([df_a,df_b],ignore_index=True)
            df=gp.quant(df)
            df = df.replace([float('inf'), -float('inf')], 0)
            df = df.fillna(0)

            data_list = df_to_datalist(df)
            insert_count, skip_count = insert_data(data_list[-1:])
    
            print(insert_count, skip_count)

            dx.append([
                row['code'].strip()[4:10], row['code'].strip()[2:4]+"."+row['code'].strip()[4:10], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
                row['turn'], row['isST'], row['adjustflag'],int(time.time())
            ])
        insert_count, skip_count = insert_datan(dx)
        print(f"股票 {code_str} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")

gp=GP(False)

# # info=gp.get_today("sh", "600000")
# # print(info)
# do_coden("sh", "600000")

df = pd.read_csv('all.csv', dtype={'code': str})
i=0
code_str=''
for index, row in df.iterrows():
    c=time.time()
    if i<25:
        code_str=row['exchange']+row['code']+","+code_str
        i=i+1
    else:
        print(code_str)
        ccc=gp.gp_all(code_str)
        i=0
        code_str=''
        insert_all(ccc,code_str)
        x=4-(time.time()-c)
        if x>0:
            time.sleep(x)
ccc=gp.gp_all(code_str)
insert_all(ccc,code_str)

tquant

import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import json
from rules import rules
import warnings
warnings.filterwarnings("ignore")  # 关掉烦人的警告

gp=GP(False)
# 数据库配置
config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "123456",
    "database": "db",
    "charset": "utf8mb4"
}

conn = pymysql.connect(**config)
cursor = conn.cursor()

def df_to_datalist(df):
    insert_list = []
    for _, row in df.iterrows():
        data = (
            row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
            row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
            row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
            row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
        )
        insert_list.append(data)
    return insert_list

def insert_data(data_list):
    sql = """
        INSERT IGNORE INTO `cmf_quantn` 
        ( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
        , `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
        , `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
        , `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
        ) 
        VALUES 
        (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ) """
    
    # 直接批量插入,重复自动跳过!
    cursor.executemany(sql, data_list)
    conn.commit()

    # affected rows 就是插入成功的数量
    insert_count = cursor.rowcount
    skip_count = len(data_list) - insert_count
    return insert_count, skip_count



def get_info(mcode):
    sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
    df = pd.read_sql(sql, conn, params=(mcode,))
    return df

def get_info_t(mcode):
    # 读取腾讯最后一行
    sql = "SELECT * FROM cmf_quantn WHERE mcode = %s ORDER BY date DESC LIMIT 1"
    df = pd.read_sql(sql, conn, params=(mcode,))
    return df

def get_score(df):
    if len(df)>90:
        score=0
        date=df.iloc[-1]['date']
        score_text=[]
        code=df.iloc[-1]['code']
        for ru in rules:
            if ru['check'](df):
                score=score+ru['score']
                score_text.append(ru['name'])
        return [
            df.iloc[-1]['id'],
            code,
            date,
            score,
            score_text,
        ]
    return False

def update_score(info):
    if info:
                # 1. 创建连接
        # conn = pymysql.connect(**config)
        # cursor = conn.cursor()
        try:
            sql="UPDATE cmf_quantn SET score=%s,score_text=%s  WHERE id=%s"
            cursor.execute(sql, [info[3],json.dumps(info[4],ensure_ascii=False),info[0]])
            conn.commit()
            return True
        except Exception as e:
            conn.rollback()  # 失败自动回滚
            return False
        # finally:
        #     cursor.close()
        #     conn.close()
    return False

dfc = pd.read_csv('all.csv', dtype={'code': str})
for index, row in dfc.iterrows():
    code=row['code']
    c=time.time()
    df=get_info(code)
    df_t=get_info_t(code)
    # 腾讯
    if (not df.empty) and (not df_t.empty):
        dfc=pd.concat([df,df_t],ignore_index=True)
        num=len(dfc)
        if num>90:
            # #最新评分
            dfc = gp.quant(dfc)
            # print(dfc.iloc[-1])
            # break
            score_list=get_score(dfc)
            print(score_list)
            if score_list:
                update_score(score_list)
                data_list = df_to_datalist(df)
                insert_count, skip_count = insert_data(data_list[-1:])   
                print(code,insert_count, skip_count)
            break
    # print(score_list[1],time.time()-c)
            # #全量评分
            # while len(df) >90:
            #     score_list=get_score(df)
            #     if score_list:
            #         update_score(score_list)
            #     df=df.iloc[:-1]
            # print(time.time()-c)
cursor.close()
conn.close()
此条目发表在None分类目录。将固定链接加入收藏夹。

发表回复