震荡区间识别

import pandas as pd
import numpy as np


#价
def price(df):
    return abs(df['high'].max()-df['low'].min())/df['low'].min()

def vper(df):
    return (abs((df['close']-df['open'])/df['open'])<=0.02).mean()

#量 前后对比
def volume(df_pre,df):
    return df_pre['volume'].mean()>=2*df['volume'].mean()
#红肥绿
def redgreen(df):
    red=(df['close']>=df['open'])
    green=(df['close']<df['open'])
    return df[red]['volume'].sum()>=2*df[green]['volume'].sum()
#涨跌段量比 
def vol_zd(df, min_days=1):
    df = df.copy()
    df['flag'] = np.where(df['close'] > df['open'], 1,
                 np.where(df['close'] < df['open'], -1, 0))
    df['group'] = (df['flag'] != df['flag'].shift()).cumsum()
    
    seg_info = []
    for _, g in df.groupby('group'):
        f = g['flag'].iloc[0]
        if f in (1, -1) and len(g) >= min_days:
            seg_info.append((f, g['volume'].mean()))
    
    pair_list = []
    for i in range(len(seg_info)-1):
        curr_f, curr_v = seg_info[i]
        next_f, next_v = seg_info[i+1]
        if curr_f == 1 and next_f == -1:
            pair_list.append((curr_v, next_v))
    
    if not pair_list:
        return False, [], 0.0
    
    each_ok = [up > down for up, down in pair_list]
    pass_rate = sum(each_ok) / len(each_ok)
    all_pass = all(each_ok)
    return all_pass, each_ok, pass_rate


def cpdf(df,k=60):
    for i in range(k,len(df)-k-1):
        pre_df=df.iloc[i-k:i].copy()
        next_df=df.iloc[i:i+k].copy()
        #价
        if price(next_df)<0.15:
            continue
        if vper(next_df)<0.7:
            continue
        #量
        if not volume(pre_df,next_df):
            continue
        if not redgreen(next_df):
            continue
        a,b,c=vol_zd(next_df)
        if c<0.7:
            continue

此条目发表在None分类目录。将固定链接加入收藏夹。

发表回复