基础回测框架

核心先创建需要的值,然后使用比较,然后创建新的df,将符合条件的df需要的值添加到新的df中,再合并新的df

import pymysql
from pymysql.err import OperationalError, ProgrammingError
import pandas as pd
import time
import numpy as np

# 数据库配置
config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "a123456",
    "database": "gupiao",
    "charset": "utf8mb4"
}

conn = pymysql.connect(**config)
cursor = conn.cursor()

def get_all_data(mcode):
    start = time.time()
    sql = "SELECT * FROM cmf_quant6 WHERE mcode = %s AND date>'2025-10-01' ORDER BY date ASC"
    df = pd.read_sql(sql, conn, params=(mcode,))
    # 清理异常值(必须保留)
    df = df.replace([float('inf'), -float('inf')], 0)
    df = df.fillna(0)
    return df

def lb(df):
    df['lb']=100*(df['high']-df['preclose'])/df['preclose']
    mask=(df['lb']>df['pctChg'])&(df['lb'] >9.9)&(df['pctChg']>5.5)
    result_df=df[mask].copy()
    result_df['open_1']=df['open'].shift(-1)[mask]
    result_df['close_1']=df['close'].shift(-1)[mask]
    result_df['high_1']=df['high'].shift(-1)[mask]
    result_df['low_1']=df['low'].shift(-1)[mask]
    result_df['turn_1']=df['turn'].shift(-1)[mask]
    result_df['pctChg_1']=df['pctChg'].shift(-1)[mask]

    result_df['open_2']=df['open'].shift(-2)[mask]
    result_df['close_2']=df['close'].shift(-2)[mask]
    result_df['high_2']=df['high'].shift(-2)[mask]
    result_df['low_2']=df['low'].shift(-2)[mask]
    result_df['turn_2']=df['turn'].shift(-2)[mask]
    result_df['pctChg_2']=df['pctChg'].shift(-2)[mask]

    result_df['open_3']=df['open'].shift(-3)[mask]
    result_df['close_3']=df['close'].shift(-3)[mask]
    result_df['high_3']=df['high'].shift(-3)[mask]
    result_df['low_3']=df['low'].shift(-3)[mask]
    result_df['turn_3']=df['turn'].shift(-3)[mask]
    result_df['pctChg_3']=df['pctChg'].shift(-3)[mask]

    result_df['OO21']=(result_df['open_2']-result_df['open_1'])/result_df['open_1']
    result_df['OO31']=(result_df['open_3']-result_df['open_1'])/result_df['open_1']

    result_df['OC21']=(result_df['close_2']-result_df['open_1'])/result_df['open_1']
    result_df['OC31']=(result_df['close_3']-result_df['open_1'])/result_df['open_1']

    result_df.dropna(inplace=True)
    return result_df
    
    
results = []
df = pd.read_csv('/home/may/gupiao/all.csv', dtype={'code': str})
i = 0
for index, row in df.iterrows():
    pre_3=row['code'][0:2]
    if pre_3=="30" or pre_3=="68":
        continue
    print(i)
    i=i+1
    df = get_all_data(row['code'])
    processed_df=lb(df)
    results.append(processed_df)
    # if i>100:
    #     break

final_df = pd.concat(results, ignore_index=True)
final_df.to_csv("lanban2.csv", index=False, encoding="utf-8-sig")

# 关闭连接
cursor.close()
conn.close()
此条目发表在None分类目录。将固定链接加入收藏夹。

发表回复