可以使用1分钟 或者5分钟线
买入点MA5 上穿MA10 必须上升趋势下,前提MA5 MA10 都是上升趋势下,在MA5 MA10下降去情况下慢慢MA5 穿MA10 不要买入,趋势反转处 附近K线 没有上影线。
卖出点MA5下穿MA10 可以在MA5 下穿MA10 前卖出 只要有足够利润,MA5下穿MA10时候已经是利润非最大话,可以在K线见顶 即多根 上影子线时候下手。
可以使用1分钟 或者5分钟线
买入点MA5 上穿MA10 必须上升趋势下,前提MA5 MA10 都是上升趋势下,在MA5 MA10下降去情况下慢慢MA5 穿MA10 不要买入,趋势反转处 附近K线 没有上影线。
卖出点MA5下穿MA10 可以在MA5 下穿MA10 前卖出 只要有足够利润,MA5下穿MA10时候已经是利润非最大话,可以在K线见顶 即多根 上影子线时候下手。
import pandas as pd
def first(df):
#流通市值 30 45 60 90 120 日均价 以及在4日内高点发生回撤时候的比率
df['lt']=df['amount']/(1000000*df['turn'])
df['hc30']=(df['high']-df['p30'])/df['p30']
df['hc45']=(df['high']-df['p45'])/df['p45']
df['hc60']=(df['high']-df['p60'])/df['p60']
df['hc90']=(df['high']-df['p90'])/df['p90']
df['hc120']=(df['high']-df['p120'])/df['p120']
is_pivot_high = df['high'] == df['high'].rolling(window=5, center=True).max()
high_df = df[is_pivot_high][['mcode','date','open','close','low','high','p30','p45','p60','p90','p120','amount','turn','lt','hc30','hc45','hc60','hc90','hc120']]
return high_df
results=[]
# 执行每个
results.append(first(df))
final_df=pd.concat(results,ignore_index=True)
final_df.to_csv("xxx.csv",index=False,encoding="utf-8-sig")
ADD
import pymysql
from pymysql.err import OperationalError, ProgrammingError
import pandas as pd
import time
def first(df):
#流通市值 30 45 60 90 120 日均价 以及在4日内高点发生回撤时候的比率
df['lt']=df['amount']/(1000000*df['turn'])
df['hc30']=(df['high']-df['p30'])/df['p30']
df['hc45']=(df['high']-df['p45'])/df['p45']
df['hc60']=(df['high']-df['p60'])/df['p60']
df['hc90']=(df['high']-df['p90'])/df['p90']
df['hc120']=(df['high']-df['p120'])/df['p120']
is_pivot_high = df['high'] == df['high'].rolling(window=5, center=True).max()
high_df = df[is_pivot_high][['mcode','date','open','close','low','high','p30','p45','p60','p90','p120','amount','turn','lt','hc30','hc45','hc60','hc90','hc120']]
return high_df
def get_all_data(mcode):
start = time.time()
sql = "SELECT * FROM cmf_quant5 WHERE mcode = %s AND date>2025-10-01 ORDER BY date ASC"
df= pd.read_sql(sql, conn, params=(mcode,))
# 清理异常值(必须保留)
df = df.replace([float('inf'), -float('inf')], 0)
df = df.fillna(0)
return df
# 数据库配置
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "gupiao",
"charset": "utf8mb4"
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
results=[]
df = pd.read_csv('/home/may/gupiao/all.csv', dtype={'code': str})
i=0
for index, row in df.iterrows():
df=get_all_data(row['code'])
results.append(first(df))
final_df=pd.concat(results,ignore_index=True)
final_df.to_csv("xxx.csv",index=False,encoding="utf-8-sig")
首先 获取pandas对象下的单个股票df 可以塞选,但是必须对于该股票之前到数据进行初步塞选
使用 pandas 判断 执行 买入 然后在相隔固定时间 一天 三天 五天 7天 10天 后的收盘价 记录 与之前买入价比较。若大于其中为胜。 且记录差值,因为是盘中数值 所以 使用当日收盘价 第二日开盘价比较
满足条件。
a当日买入 收盘价
b 下一日开盘价
比较后 1天 2天 3天 5天 7天 10天
更具多个股票信息分别统计 单因子规律
简化步骤
触发买入
触发卖出
止损卖出
倒入个股的 pandas数据
执行 买入信号 买入
盈利止盈 卖出atr 止损atr清仓
执行到最后一天看整体数据
也可以使用backtrader进行回测
import pandas as pd
import numpy as np
#价
def price(df):
return abs(df['high'].max()-df['low'].min())/df['low'].min()
def vper(df):
return (abs((df['close']-df['open'])/df['open'])<=0.02).mean()
#量 前后对比
def volume(df_pre,df):
return df_pre['volume'].mean()>=2*df['volume'].mean()
#红肥绿
def redgreen(df):
red=(df['close']>=df['open'])
green=(df['close']<df['open'])
return df[red]['volume'].sum()>=2*df[green]['volume'].sum()
#涨跌段量比
def vol_zd(df, min_days=1):
df = df.copy()
df['flag'] = np.where(df['close'] > df['open'], 1,
np.where(df['close'] < df['open'], -1, 0))
df['group'] = (df['flag'] != df['flag'].shift()).cumsum()
seg_info = []
for _, g in df.groupby('group'):
f = g['flag'].iloc[0]
if f in (1, -1) and len(g) >= min_days:
seg_info.append((f, g['volume'].mean()))
pair_list = []
for i in range(len(seg_info)-1):
curr_f, curr_v = seg_info[i]
next_f, next_v = seg_info[i+1]
if curr_f == 1 and next_f == -1:
pair_list.append((curr_v, next_v))
if not pair_list:
return False, [], 0.0
each_ok = [up > down for up, down in pair_list]
pass_rate = sum(each_ok) / len(each_ok)
all_pass = all(each_ok)
return all_pass, each_ok, pass_rate
def merge_intervals(intervals):
"""合并重叠或相邻的区间,intervals: [(start, end), ...],左闭右开"""
if not intervals:
return []
intervals.sort(key=lambda x: x[0]) # 按起点排序
merged = [list(intervals[0])] # 用列表方便修改
for start, end in intervals[1:]:
if start <= merged[-1][1]: # 重叠或刚好相邻
merged[-1][1] = max(merged[-1][1], end)
else:
merged.append([start, end])
return [tuple(m) for m in merged] # 转回元组
def cpdf(df,k=60):
valid_intervals = []
for i in range(k,len(df)-k-1):
pre_df=df.iloc[i-k:i].copy()
next_df=df.iloc[i:i+k].copy()
#价
if price(next_df)<0.15:
continue
if vper(next_df)<0.7:
continue
#量
if not volume(pre_df,next_df):
continue
if not redgreen(next_df):
continue
a,b,c=vol_zd(next_df)
if c<0.7:
continue
valid_intervals.append((i, i + k))
if not valid_intervals:
return []
# 合并区间(使用之前的通用函数)
merged_rows = merge_intervals(valid_intervals) # 例如 [(100, 220), (350, 500)]
# 将行号区间转换为日期区间(方便阅读)
result = []
for start_row, end_row in merged_rows:
start_date = df.iloc[start_row]['date'] # 假设date列是日期
end_date = df.iloc[end_row - 1]['date'] # 因为右开,取前一行日期
result.append((start_row, end_row, start_date, end_date))
return result
def z(df):
high_jugg=(
(df['high']>df['high'].shift(1))&(df['high']>df['high'].shift(2))&(df['high']>df['high'].shift(3))&(df['high']>df['high'].shift(4))&
(df['high']>df['high'].shift(-1))&(df['high']>df['high'].shift(-2))&(df['high']>df['high'].shift(-3))&(df['high']>df['high'].shift(-4))
)
low_jugg=(
(df['low']<df['low'].shift(1))&(df['low']<df['low'].shift(2))&(df['low']<df['low'].shift(3))&(df['low']<df['low'].shift(4))&
(df['low']<df['low'].shift(-1))&(df['low']<df['low'].shift(-2))&(df['low']<df['low'].shift(-3))&(df['low']<df['low'].shift(-4))
)
hp=df[high_jugg]
lp=df[low_jugg]
if len(hp)<2 or len(lp)<2:
return False
point_max=max(len(hp),len(lp))
point_min=min(len(hp),len(lp))
for i in range(point_min-1):
h_index=hp.index[i]
l_index=lp.index[i]
if abs(hp.iloc[i+1]['high']-hp.iloc[i]['high'])/hp.iloc[i]['high']>0.03:
return False
if abs(lp.iloc[i+1]['low']-lp.iloc[i]['low'])/lp.iloc[i]['low']>0.03:
return False
if (hp.iloc[i]['high']-lp.iloc[i]['low'])/lp.iloc[i]['low']>0.15 or (hp.iloc[i]['high']-lp.iloc[i]['low'])/lp.iloc[i]['low']<0.08:
return False
if 1.8*df['vol_ma20'].shift(1).loc[h_index]>hp.iloc[i]['volume']:
return False
if 1.8*df['vol_ma20'].shift(1).loc[l_index]>lp.iloc[i]['volume']:
return False
return [hp,lp]