日内T 经验

可以使用1分钟 或者5分钟线

买入点MA5 上穿MA10 必须上升趋势下,前提MA5 MA10 都是上升趋势下,在MA5 MA10下降去情况下慢慢MA5 穿MA10 不要买入,趋势反转处 附近K线 没有上影线。

卖出点MA5下穿MA10 可以在MA5 下穿MA10 前卖出 只要有足够利润,MA5下穿MA10时候已经是利润非最大话,可以在K线见顶 即多根 上影子线时候下手。

发表在 None | 留下评论

回测 回撤 hline

import pandas as pd

def first(df):
    #流通市值 30 45 60 90 120 日均价 以及在4日内高点发生回撤时候的比率
    df['lt']=df['amount']/(1000000*df['turn'])
    df['hc30']=(df['high']-df['p30'])/df['p30']
    df['hc45']=(df['high']-df['p45'])/df['p45']
    df['hc60']=(df['high']-df['p60'])/df['p60']
    df['hc90']=(df['high']-df['p90'])/df['p90']
    df['hc120']=(df['high']-df['p120'])/df['p120']
    is_pivot_high = df['high'] == df['high'].rolling(window=5, center=True).max()
    high_df = df[is_pivot_high][['mcode','date','open','close','low','high','p30','p45','p60','p90','p120','amount','turn','lt','hc30','hc45','hc60','hc90','hc120']]
    return high_df

        
results=[]
# 执行每个
results.append(first(df))

final_df=pd.concat(results,ignore_index=True)
final_df.to_csv("xxx.csv",index=False,encoding="utf-8-sig")

ADD

import pymysql
from pymysql.err import OperationalError, ProgrammingError
import pandas as pd
import time



def first(df):
    #流通市值 30 45 60 90 120 日均价 以及在4日内高点发生回撤时候的比率
    df['lt']=df['amount']/(1000000*df['turn'])
    df['hc30']=(df['high']-df['p30'])/df['p30']
    df['hc45']=(df['high']-df['p45'])/df['p45']
    df['hc60']=(df['high']-df['p60'])/df['p60']
    df['hc90']=(df['high']-df['p90'])/df['p90']
    df['hc120']=(df['high']-df['p120'])/df['p120']
    is_pivot_high = df['high'] == df['high'].rolling(window=5, center=True).max()
    high_df = df[is_pivot_high][['mcode','date','open','close','low','high','p30','p45','p60','p90','p120','amount','turn','lt','hc30','hc45','hc60','hc90','hc120']]
    return high_df

def get_all_data(mcode):
    start = time.time()
    sql = "SELECT * FROM cmf_quant5 WHERE mcode = %s AND date>2025-10-01 ORDER BY date ASC"
    df= pd.read_sql(sql, conn, params=(mcode,))
     # 清理异常值(必须保留)
    df = df.replace([float('inf'), -float('inf')], 0)
    df = df.fillna(0)
    return df


# 数据库配置
config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "123456",
    "database": "gupiao",
    "charset": "utf8mb4"
}

conn = pymysql.connect(**config)
cursor = conn.cursor()

results=[]
df = pd.read_csv('/home/may/gupiao/all.csv', dtype={'code': str})
i=0
for index, row in df.iterrows():
    df=get_all_data(row['code'])
    results.append(first(df))
final_df=pd.concat(results,ignore_index=True)
final_df.to_csv("xxx.csv",index=False,encoding="utf-8-sig")
发表在 None | 留下评论

基础回测

首先 获取pandas对象下的单个股票df 可以塞选,但是必须对于该股票之前到数据进行初步塞选

使用 pandas 判断 执行 买入 然后在相隔固定时间 一天 三天 五天 7天 10天 后的收盘价 记录 与之前买入价比较若大于其中为胜 且记录差值因为是盘中数值 所以 使用当日收盘价 第二日开盘价比较

满足条件。

a当日买入 收盘价

b 下一日开盘价

比较后 1天 2天 3天 5天 7天 10天

更具多个股票信息分别统计 单因子规律


简化步骤

触发买入

触发卖出

损卖出

倒入个股的 pandas数据

执行 买入信号 买入

盈利止盈 卖出atr 止损atr清仓

执行到最后一天看整体数据

也可以使用backtrader进行回测

发表在 None | 留下评论

震荡区间识别

import pandas as pd
import numpy as np


#价
def price(df):
    return abs(df['high'].max()-df['low'].min())/df['low'].min()

def vper(df):
    return (abs((df['close']-df['open'])/df['open'])<=0.02).mean()

#量 前后对比
def volume(df_pre,df):
    return df_pre['volume'].mean()>=2*df['volume'].mean()
#红肥绿
def redgreen(df):
    red=(df['close']>=df['open'])
    green=(df['close']<df['open'])
    return df[red]['volume'].sum()>=2*df[green]['volume'].sum()
#涨跌段量比 
def vol_zd(df, min_days=1):
    df = df.copy()
    df['flag'] = np.where(df['close'] > df['open'], 1,
                 np.where(df['close'] < df['open'], -1, 0))
    df['group'] = (df['flag'] != df['flag'].shift()).cumsum()
    
    seg_info = []
    for _, g in df.groupby('group'):
        f = g['flag'].iloc[0]
        if f in (1, -1) and len(g) >= min_days:
            seg_info.append((f, g['volume'].mean()))
    
    pair_list = []
    for i in range(len(seg_info)-1):
        curr_f, curr_v = seg_info[i]
        next_f, next_v = seg_info[i+1]
        if curr_f == 1 and next_f == -1:
            pair_list.append((curr_v, next_v))
    
    if not pair_list:
        return False, [], 0.0
    
    each_ok = [up > down for up, down in pair_list]
    pass_rate = sum(each_ok) / len(each_ok)
    all_pass = all(each_ok)
    return all_pass, each_ok, pass_rate

def merge_intervals(intervals):
    """合并重叠或相邻的区间,intervals: [(start, end), ...],左闭右开"""
    if not intervals:
        return []
    intervals.sort(key=lambda x: x[0])          # 按起点排序
    merged = [list(intervals[0])]               # 用列表方便修改
    for start, end in intervals[1:]:
        if start <= merged[-1][1]:              # 重叠或刚好相邻
            merged[-1][1] = max(merged[-1][1], end)
        else:
            merged.append([start, end])
    return [tuple(m) for m in merged]           # 转回元组

def cpdf(df,k=60):
    valid_intervals = []
    for i in range(k,len(df)-k-1):
        pre_df=df.iloc[i-k:i].copy()
        next_df=df.iloc[i:i+k].copy()
        #价
        if price(next_df)<0.15:
            continue
        if vper(next_df)<0.7:
            continue
        #量
        if not volume(pre_df,next_df):
            continue
        if not redgreen(next_df):
            continue
        a,b,c=vol_zd(next_df)
        if c<0.7:
            continue
        valid_intervals.append((i, i + k))
    
    if not valid_intervals:
        return []
    
    # 合并区间(使用之前的通用函数)
    merged_rows = merge_intervals(valid_intervals)  # 例如 [(100, 220), (350, 500)]
    
    # 将行号区间转换为日期区间(方便阅读)
    result = []
    for start_row, end_row in merged_rows:
        start_date = df.iloc[start_row]['date']   # 假设date列是日期
        end_date = df.iloc[end_row - 1]['date']   # 因为右开,取前一行日期
        result.append((start_row, end_row, start_date, end_date))
    
    return result



发表在 None | 留下评论

TEST 箱体区间

def z(df):
    high_jugg=(
            (df['high']>df['high'].shift(1))&(df['high']>df['high'].shift(2))&(df['high']>df['high'].shift(3))&(df['high']>df['high'].shift(4))&
            (df['high']>df['high'].shift(-1))&(df['high']>df['high'].shift(-2))&(df['high']>df['high'].shift(-3))&(df['high']>df['high'].shift(-4))
        )
    low_jugg=(
        (df['low']<df['low'].shift(1))&(df['low']<df['low'].shift(2))&(df['low']<df['low'].shift(3))&(df['low']<df['low'].shift(4))&
        (df['low']<df['low'].shift(-1))&(df['low']<df['low'].shift(-2))&(df['low']<df['low'].shift(-3))&(df['low']<df['low'].shift(-4))
    )
    hp=df[high_jugg]
    lp=df[low_jugg]
    if len(hp)<2 or len(lp)<2:
        return False
    point_max=max(len(hp),len(lp))
    point_min=min(len(hp),len(lp))
    for i in range(point_min-1):
        h_index=hp.index[i]
        l_index=lp.index[i]

        if abs(hp.iloc[i+1]['high']-hp.iloc[i]['high'])/hp.iloc[i]['high']>0.03:
            return False
        if abs(lp.iloc[i+1]['low']-lp.iloc[i]['low'])/lp.iloc[i]['low']>0.03:
            return False

        if (hp.iloc[i]['high']-lp.iloc[i]['low'])/lp.iloc[i]['low']>0.15 or (hp.iloc[i]['high']-lp.iloc[i]['low'])/lp.iloc[i]['low']<0.08:
            return False
        
        if 1.8*df['vol_ma20'].shift(1).loc[h_index]>hp.iloc[i]['volume']:
            return False
        if 1.8*df['vol_ma20'].shift(1).loc[l_index]>lp.iloc[i]['volume']:
            return False
    return [hp,lp]
发表在 None | 留下评论