import pandas as pd
import numpy as np
#价
def price(df):
return abs(df['high'].max()-df['low'].min())/df['low'].min()
def vper(df):
return (abs((df['close']-df['open'])/df['open'])<=0.02).mean()
#量 前后对比
def volume(df_pre,df):
return df_pre['volume'].mean()>=2*df['volume'].mean()
#红肥绿
def redgreen(df):
red=(df['close']>=df['open'])
green=(df['close']<df['open'])
return df[red]['volume'].sum()>=2*df[green]['volume'].sum()
#涨跌段量比
def vol_zd(df, min_days=1):
df = df.copy()
df['flag'] = np.where(df['close'] > df['open'], 1,
np.where(df['close'] < df['open'], -1, 0))
df['group'] = (df['flag'] != df['flag'].shift()).cumsum()
seg_info = []
for _, g in df.groupby('group'):
f = g['flag'].iloc[0]
if f in (1, -1) and len(g) >= min_days:
seg_info.append((f, g['volume'].mean()))
pair_list = []
for i in range(len(seg_info)-1):
curr_f, curr_v = seg_info[i]
next_f, next_v = seg_info[i+1]
if curr_f == 1 and next_f == -1:
pair_list.append((curr_v, next_v))
if not pair_list:
return False, [], 0.0
each_ok = [up > down for up, down in pair_list]
pass_rate = sum(each_ok) / len(each_ok)
all_pass = all(each_ok)
return all_pass, each_ok, pass_rate
def cpdf(df,k=60):
for i in range(k,len(df)-k-1):
pre_df=df.iloc[i-k:i].copy()
next_df=df.iloc[i:i+k].copy()
#价
if price(next_df)<0.15:
continue
if vper(next_df)<0.7:
continue
#量
if not volume(pre_df,next_df):
continue
if not redgreen(next_df):
continue
a,b,c=vol_zd(next_df)
if c<0.7:
continue