简化步骤
触发买入
触发卖出
止损卖出
倒入个股的 pandas数据
执行 买入信号 买入
盈利止盈 卖出atr 止损atr清仓
执行到最后一天看整体数据
也可以使用backtrader进行回测
简化步骤
触发买入
触发卖出
止损卖出
倒入个股的 pandas数据
执行 买入信号 买入
盈利止盈 卖出atr 止损atr清仓
执行到最后一天看整体数据
也可以使用backtrader进行回测
import pandas as pd
import numpy as np
#价
def price(df):
return abs(df['high'].max()-df['low'].min())/df['low'].min()
def vper(df):
return (abs((df['close']-df['open'])/df['open'])<=0.02).mean()
#量 前后对比
def volume(df_pre,df):
return df_pre['volume'].mean()>=2*df['volume'].mean()
#红肥绿
def redgreen(df):
red=(df['close']>=df['open'])
green=(df['close']<df['open'])
return df[red]['volume'].sum()>=2*df[green]['volume'].sum()
#涨跌段量比
def vol_zd(df, min_days=1):
df = df.copy()
df['flag'] = np.where(df['close'] > df['open'], 1,
np.where(df['close'] < df['open'], -1, 0))
df['group'] = (df['flag'] != df['flag'].shift()).cumsum()
seg_info = []
for _, g in df.groupby('group'):
f = g['flag'].iloc[0]
if f in (1, -1) and len(g) >= min_days:
seg_info.append((f, g['volume'].mean()))
pair_list = []
for i in range(len(seg_info)-1):
curr_f, curr_v = seg_info[i]
next_f, next_v = seg_info[i+1]
if curr_f == 1 and next_f == -1:
pair_list.append((curr_v, next_v))
if not pair_list:
return False, [], 0.0
each_ok = [up > down for up, down in pair_list]
pass_rate = sum(each_ok) / len(each_ok)
all_pass = all(each_ok)
return all_pass, each_ok, pass_rate
def merge_intervals(intervals):
"""合并重叠或相邻的区间,intervals: [(start, end), ...],左闭右开"""
if not intervals:
return []
intervals.sort(key=lambda x: x[0]) # 按起点排序
merged = [list(intervals[0])] # 用列表方便修改
for start, end in intervals[1:]:
if start <= merged[-1][1]: # 重叠或刚好相邻
merged[-1][1] = max(merged[-1][1], end)
else:
merged.append([start, end])
return [tuple(m) for m in merged] # 转回元组
def cpdf(df,k=60):
valid_intervals = []
for i in range(k,len(df)-k-1):
pre_df=df.iloc[i-k:i].copy()
next_df=df.iloc[i:i+k].copy()
#价
if price(next_df)<0.15:
continue
if vper(next_df)<0.7:
continue
#量
if not volume(pre_df,next_df):
continue
if not redgreen(next_df):
continue
a,b,c=vol_zd(next_df)
if c<0.7:
continue
valid_intervals.append((i, i + k))
if not valid_intervals:
return []
# 合并区间(使用之前的通用函数)
merged_rows = merge_intervals(valid_intervals) # 例如 [(100, 220), (350, 500)]
# 将行号区间转换为日期区间(方便阅读)
result = []
for start_row, end_row in merged_rows:
start_date = df.iloc[start_row]['date'] # 假设date列是日期
end_date = df.iloc[end_row - 1]['date'] # 因为右开,取前一行日期
result.append((start_row, end_row, start_date, end_date))
return result
def z(df):
high_jugg=(
(df['high']>df['high'].shift(1))&(df['high']>df['high'].shift(2))&(df['high']>df['high'].shift(3))&(df['high']>df['high'].shift(4))&
(df['high']>df['high'].shift(-1))&(df['high']>df['high'].shift(-2))&(df['high']>df['high'].shift(-3))&(df['high']>df['high'].shift(-4))
)
low_jugg=(
(df['low']<df['low'].shift(1))&(df['low']<df['low'].shift(2))&(df['low']<df['low'].shift(3))&(df['low']<df['low'].shift(4))&
(df['low']<df['low'].shift(-1))&(df['low']<df['low'].shift(-2))&(df['low']<df['low'].shift(-3))&(df['low']<df['low'].shift(-4))
)
hp=df[high_jugg]
lp=df[low_jugg]
if len(hp)<2 or len(lp)<2:
return False
point_max=max(len(hp),len(lp))
point_min=min(len(hp),len(lp))
for i in range(point_min-1):
h_index=hp.index[i]
l_index=lp.index[i]
if abs(hp.iloc[i+1]['high']-hp.iloc[i]['high'])/hp.iloc[i]['high']>0.03:
return False
if abs(lp.iloc[i+1]['low']-lp.iloc[i]['low'])/lp.iloc[i]['low']>0.03:
return False
if (hp.iloc[i]['high']-lp.iloc[i]['low'])/lp.iloc[i]['low']>0.15 or (hp.iloc[i]['high']-lp.iloc[i]['low'])/lp.iloc[i]['low']<0.08:
return False
if 1.8*df['vol_ma20'].shift(1).loc[h_index]>hp.iloc[i]['volume']:
return False
if 1.8*df['vol_ma20'].shift(1).loc[l_index]>lp.iloc[i]['volume']:
return False
return [hp,lp]
import pandas as pd
import numpy as np
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from matplotlib.lines import Line2D
import time
# 全局配置:解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei', 'WenQuanYi Zen Hei', 'Heiti TC', 'sans-serif']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
class chip:
def __init__(self, k_day, k_30):
self.k_30 = k_30.copy()
self.k_day = k_day.copy()
self.k_30_result = None
self.avg_price = 0
self.k_box = None
self.box_chip_dist = None
self.show_type=0
self.day=0
self.code=k_day.iloc[0]['mcode']
# 日期格式化
self.k_30['date'] = pd.to_datetime(self.k_30['date'])
self.k_30['datetime'] = pd.to_datetime(self.k_30['datetime'])
self.k_day['date'] = pd.to_datetime(self.k_day['date'])
self.k_day['turn'] = self.k_day['turn'] / 100
# if self.k_day['turn'].median() > 1:
# self.k_day['turn'] = self.k_day['turn'] / 100
# print(f"换手率已转换:百分比 → 小数")
# else:
# print(f"换手率已是小数格式,无需转换")
# 排序
self.k_30 = self.k_30.sort_values('datetime', ascending=True).reset_index(drop=True)
self.k_day = self.k_day.sort_values('date', ascending=True).reset_index(drop=True)
def day_cmp(self):
s_date = self.k_30.iloc[0]['date']
self.k_day_copy = self.k_day.loc[self.k_day['date'] >= s_date].copy()
self.k_day_copy['today'] = self.k_day_copy['amount'] / self.k_day_copy['volume']
self.k_day_copy['yesterday'] = self.k_day_copy['amount'].shift(1) / self.k_day_copy['volume'].shift(1)
#0.002判断 即0.2%以内横盘
if self.show_type==2:
self.k_day_copy['k'] = 0.8
# self.k_day_copy.loc[self.k_day_copy['today']<self.k_day_copy['yesterday'], 'k'] = 0.65
# self.k_day_copy.loc[self.k_day_copy['today']>self.k_day_copy['yesterday'], 'k'] = 0.95
self.k_day_copy.loc[(self.k_day_copy['today']-self.k_day_copy['yesterday'])/self.k_day_copy['yesterday']< -0.002, 'k'] = 0.65
self.k_day_copy.loc[(self.k_day_copy['today']-self.k_day_copy['yesterday'])/self.k_day_copy['yesterday']> 0.002, 'k'] = 0.95
elif self.show_type==1:
self.k_day_copy['k']=1
else:
self.k_day_copy['k']=1
turn_arr = self.k_day_copy['turn'].values
k_arr = self.k_day_copy['k'].values
if self.show_type>0:
factor_arr = 1 - turn_arr * k_arr
else:
factor_arr = 1- turn_arr * k_arr*0
n = len(factor_arr)
k_mat = np.tile(factor_arr, (n, 1))
mask = np.triu(np.ones((n, n), dtype=bool), k=1)
k_mat = np.where(mask, k_mat, 1.0)
mat = np.cumprod(k_mat, axis=1)
final_retention = mat[:, -1]
self.k_day_copy['cmp'] = final_retention
self.merge_day_cmp_to_30m()
self.k_30_result['cmp_volume'] = self.k_30_result['cmp'] * self.k_30_result['volume']
self.k_30_result['avg'] = self.k_30_result['amount'] / self.k_30_result['volume']
self.k_30_result['cmp_amount'] = self.k_30_result['avg'] * self.k_30_result['cmp_volume']
total_vol = self.k_30_result['cmp_volume'].sum()
total_amt = self.k_30_result['cmp_amount'].sum()
self.avg_price = total_amt / total_vol if total_vol > 0 else 0
def merge_day_cmp_to_30m(self):
day_cmp = self.k_day_copy[['date', 'cmp']].copy()
self.k_30_result = self.k_30.copy()
self.k_30_result = self.k_30_result.merge(day_cmp, on='date', how='left')
# ====================== 【重要修改】交易日箱体 ======================
def box(self, box_days=120, end_date=None):
"""
按 交易日 切分箱体(正确版)
"""
self.day=box_days
df_day = self.k_day.copy()
df_30 = self.k_30_result.copy()
# 1. 确定结束日期
if end_date is None:
end_date = df_day['date'].max()
else:
end_date = pd.to_datetime(end_date)
if end_date > df_day['date'].max():
end_date = df_day['date'].max()
# 2. 取到 end_date 为止的所有日K
df_day_cut = df_day[df_day['date'] <= end_date].copy()
# 3. 取 最近 N 个 交易日
if len(df_day_cut) >= box_days:
df_day_box = df_day_cut.tail(box_days)
else:
df_day_box = df_day_cut
start_date = df_day_box['date'].min()
# 4. 用真实交易日区间 筛选 30分钟K线
df_box = df_30[(df_30['date'] >= start_date) & (df_30['date'] <= end_date)]
# 计算箱体成本
total_vol = df_box['cmp_volume'].sum()
if total_vol == 0:
box_cost = 0
else:
box_cost = (df_box['avg'] * df_box['cmp_volume']).sum() / total_vol
self.k_box = df_box
return box_cost
def get_map(self, bins=80):
if self.k_box is None or self.k_box.empty:
raise Exception("先调用 box() 计算箱体")
df_box = self.k_box.copy()
df_box['price_range'] = pd.cut(df_box['avg'], bins=bins)
dist = df_box.groupby('price_range')['cmp_volume'].sum().reset_index()
total = dist['cmp_volume'].sum()
dist['percent'] = dist['cmp_volume'] / total * 100 if total > 0 else 0
# 最简单最稳定的写法,绝对不报错
price_left = []
price_right = []
for interval in dist['price_range']:
l = interval.left
r = interval.right
price_left.append(float(l))
price_right.append(float(r))
dist['price_left'] = price_left
dist['price_right'] = price_right
dist['price_mid'] = (dist['price_left'] + dist['price_right']) / 2
self.box_chip_dist = dist
return dist
def plot_box_chip_top3(c, dist, save_path=None,min_percent=0.01, dpi=300):
"""
1. 过滤占比低于 min_percent 极细小筹码
2. 纵轴固定10刻度,减少毛刺拥挤
3. TOP1红/TOP2橙/TOP3绿高亮
4. 右侧竖表展示过滤后前15名
5. 表格前三名底色与图形配色一致
6. 新增:筹码总占比 + 箱体总成交量
"""
code = c.code
day = c.day
if c.show_type == 0:
show_type = "「堆叠筹码」"
elif c.show_type == 1:
show_type = "「恒定衰减系数K=1」"
elif c.show_type == 2:
show_type = "「动态衰减系数k」"
else:
show_type = "「其他」"
# ========== 过滤极小筹码 ==========
dist = dist[dist['percent'] >= min_percent].reset_index(drop=True)
if len(dist) == 0:
print("过滤后无有效筹码数据,请调低过滤阈值")
return
# ========== 计算总数:筹码总和 + 成交量总和 ==========
total_chip = dist['percent'].sum()
total_volume = c.k_box['volume'].sum()
total_volume_yy = total_volume / 10000
# 画布布局
fig = plt.figure(figsize=(16, 9))
ax1 = plt.subplot2grid((1, 12), (0, 0), colspan=9)
ax2 = plt.subplot2grid((1, 12), (0, 9), colspan=3)
ax2.axis('off')
# 绘制基础蓝色筹码
ax1.barh(
y=dist['price_mid'],
width=dist['percent'],
height=(dist['price_right'] - dist['price_left']) * 0.9,
color='#4285F4',
alpha=0.85
)
# 排序
dist_sorted = dist.sort_values('percent', ascending=False).reset_index(drop=True)
top1 = dist_sorted.iloc[0]
top2 = dist_sorted.iloc[1]
top3 = dist_sorted.iloc[2]
colors = ['#E53935', '#FB8C00', '#43A047']
top_list = [top1, top2, top3]
labels = []
# 覆盖高亮前三
for i, row in enumerate(top_list):
pct = row['percent']
l = row['price_left']
r = row['price_right']
mid = row['price_mid']
ax1.barh(
y=mid,
width=pct,
height=(r - l) * 0.9,
color=colors[i],
alpha=1
)
labels.append(f"TOP{i+1} | {l:.2f}~{r:.2f} | 占比 {pct:.2f}%")
# 纵轴固定10刻度,精简刻度去毛刺
price_min = dist['price_left'].min()
price_max = dist['price_right'].max()
y_ticks = np.linspace(price_min, price_max, 10)
ax1.set_yticks(y_ticks)
ax1.tick_params(axis='y', labelsize=10)
# 成本参考线
box_cost = c.box(day)
box_open = c.k_box.iloc[0]['open']
box_close = c.k_box.iloc[-1]['close']
ax1.axhline(y=box_cost, color='maroon', linestyle='--', linewidth=2)
# ax1.axhline(y=box_open, color='purple', linestyle='--', linewidth=2)
ax1.axhline(y=box_close, color='navy', linestyle='--', linewidth=2)
# 图表样式 + 显示总数信息
ax1.set_xlabel('筹码占比 (%)', fontsize=12)
ax1.set_ylabel('价格', fontsize=12)
ax1.set_title(f"{str(c.code).zfill(6)} | 共{day}天箱体筹码分布 {show_type} | 过滤<{min_percent}%细碎筹码", fontsize=14)
# ========== 显示总数 ==========
ax1.text(0.5, 1.05, f"筹码总占比:{total_chip:.2f}% | 箱体总成交量:{total_volume_yy:.2f} 万股",transform=ax1.transAxes, ha='center', fontsize=12, weight='bold', color='darkred')
ax1.grid(alpha=0.3)
# 图例(已修复:你注释掉开盘价,这里也去掉)
legend_elements = [
Rectangle((0, 0), 1, 1, color=colors[0]),
Rectangle((0, 0), 1, 1, color=colors[1]),
Rectangle((0, 0), 1, 1, color=colors[2]),
Line2D([0], [0], color='maroon', linestyle='--', linewidth=2),
Line2D([0], [0], color='navy', linestyle='--', linewidth=2),
]
legend_labels = [
labels[0], labels[1], labels[2],
f'平均成本: {box_cost:.2f}',
f'箱体收盘价: {box_close:.2f}'
]
ax1.legend(legend_elements, legend_labels, loc='upper right', fontsize=10)
# 右侧前15表格
top15 = dist_sorted.head(15).copy()
top15['排名'] = [f'第{i}名' for i in range(1, len(top15)+1)]
top15['价格区间'] = top15.apply(lambda x: f"{x['price_left']:.2f}~{x['price_right']:.2f}", axis=1)
top15['占比(%)'] = top15['percent'].apply(lambda x: f"{x:.2f}%")
table_data = top15[['排名', '价格区间', '占比(%)']].values.tolist()
# ========== 已修复:定义表格列名 ==========
colLabels = ['排名', '价格区间', '筹码占比']
table = ax2.table(
cellText=table_data,
colLabels=colLabels,
loc='center',
cellLoc='center',
colWidths=[0.2, 0.5, 0.3]
)
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1, 1.7)
# 表格前三行上色对应图形颜色
for idx, color in enumerate(['#ffcccc','#ffe6cc','#ccf2cc']):
if idx < len(table_data):
table[(idx+1, 0)].set_facecolor(color)
table[(idx+1, 1)].set_facecolor(color)
table[(idx+1, 2)].set_facecolor(color)
ax2.text(0.5, 0.97, f'过滤细碎筹码\n前15名(≥{min_percent}%)',ha='center', va='top', fontsize=12, weight='bold')
plt.tight_layout()
plt.subplots_adjust(wspace=0.3)
if save_path:
# 自动创建目录(如果不存在)
from pathlib import Path
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
plt.savefig(save_path, dpi=dpi, bbox_inches='tight', facecolor='white')
print(f"图片已保存至: {save_path}")
plt.show()
k_day=pd.read_csv('cmf_quant4.csv')
k_30=pd.read_csv('cmf_half.csv')
c = chip(k_day, k_30)
c.show_type=0
c.day_cmp()
c.box(300) # 300 个交易日/
dist = c.get_map()
plot_box_chip_top3(c,dist,"a.jpg")
60日筹码结构
每日的5分钟k线 均价 成为当日筹码结构价格点 一共48组
筹码 残留度 使用(1-turn*k)
当日各个价格筹码量从下一日开始循环迭代乘以残留度 ,到最后一天可以获得60日筹码结构
方法一 k取1
方法二 反正取第二日k值 第二日均价 比较前一日均价 如果第二日价高 k 0.85 如果一样 k 0.75 如果更低 k 0.6
方法三 更具当日筹码结构 去计算 当日均价 与筹码获利盘 和 套牢盘 不同价格差价下比率 为k 然后更具对应筹码结构 算出 整体系数 首先按照K=1 先算出第一个60日筹码结构后期可以使用不同的K去实现不同的筹码结构
最后 筹码结构图
筹码均价
当日均价 或者价格
筹码峰