import pandas as pd
import numpy as np
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from matplotlib.lines import Line2D
import time
# 全局配置:解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei', 'WenQuanYi Zen Hei', 'Heiti TC', 'sans-serif']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
class chip:
def __init__(self, k_day, k_30):
self.k_30 = k_30.copy()
self.k_day = k_day.copy()
self.k_30_result = None
self.avg_price = 0
self.k_box = None
self.box_chip_dist = None
self.show_type=0
self.day=0
self.code=k_day.iloc[0]['mcode']
# 日期格式化
self.k_30['date'] = pd.to_datetime(self.k_30['date'])
self.k_30['datetime'] = pd.to_datetime(self.k_30['datetime'])
self.k_day['date'] = pd.to_datetime(self.k_day['date'])
self.k_day['turn'] = self.k_day['turn'] / 100
# if self.k_day['turn'].median() > 1:
# self.k_day['turn'] = self.k_day['turn'] / 100
# print(f"换手率已转换:百分比 → 小数")
# else:
# print(f"换手率已是小数格式,无需转换")
# 排序
self.k_30 = self.k_30.sort_values('datetime', ascending=True).reset_index(drop=True)
self.k_day = self.k_day.sort_values('date', ascending=True).reset_index(drop=True)
def day_cmp(self):
s_date = self.k_30.iloc[0]['date']
self.k_day_copy = self.k_day.loc[self.k_day['date'] >= s_date].copy()
self.k_day_copy['today'] = self.k_day_copy['amount'] / self.k_day_copy['volume']
self.k_day_copy['yesterday'] = self.k_day_copy['amount'].shift(1) / self.k_day_copy['volume'].shift(1)
#0.002判断 即0.2%以内横盘
if self.show_type==2:
self.k_day_copy['k'] = 0.8
# self.k_day_copy.loc[self.k_day_copy['today']<self.k_day_copy['yesterday'], 'k'] = 0.65
# self.k_day_copy.loc[self.k_day_copy['today']>self.k_day_copy['yesterday'], 'k'] = 0.95
self.k_day_copy.loc[(self.k_day_copy['today']-self.k_day_copy['yesterday'])/self.k_day_copy['yesterday']< -0.002, 'k'] = 0.65
self.k_day_copy.loc[(self.k_day_copy['today']-self.k_day_copy['yesterday'])/self.k_day_copy['yesterday']> 0.002, 'k'] = 0.95
elif self.show_type==1:
self.k_day_copy['k']=1
else:
self.k_day_copy['k']=1
turn_arr = self.k_day_copy['turn'].values
k_arr = self.k_day_copy['k'].values
if self.show_type>0:
factor_arr = 1 - turn_arr * k_arr
else:
factor_arr = 1- turn_arr * k_arr*0
n = len(factor_arr)
k_mat = np.tile(factor_arr, (n, 1))
mask = np.triu(np.ones((n, n), dtype=bool), k=1)
k_mat = np.where(mask, k_mat, 1.0)
mat = np.cumprod(k_mat, axis=1)
final_retention = mat[:, -1]
self.k_day_copy['cmp'] = final_retention
self.merge_day_cmp_to_30m()
self.k_30_result['cmp_volume'] = self.k_30_result['cmp'] * self.k_30_result['volume']
self.k_30_result['avg'] = self.k_30_result['amount'] / self.k_30_result['volume']
self.k_30_result['cmp_amount'] = self.k_30_result['avg'] * self.k_30_result['cmp_volume']
total_vol = self.k_30_result['cmp_volume'].sum()
total_amt = self.k_30_result['cmp_amount'].sum()
self.avg_price = total_amt / total_vol if total_vol > 0 else 0
def merge_day_cmp_to_30m(self):
day_cmp = self.k_day_copy[['date', 'cmp']].copy()
self.k_30_result = self.k_30.copy()
self.k_30_result = self.k_30_result.merge(day_cmp, on='date', how='left')
# ====================== 【重要修改】交易日箱体 ======================
def box(self, box_days=120, end_date=None):
"""
按 交易日 切分箱体(正确版)
"""
self.day=box_days
df_day = self.k_day.copy()
df_30 = self.k_30_result.copy()
# 1. 确定结束日期
if end_date is None:
end_date = df_day['date'].max()
else:
end_date = pd.to_datetime(end_date)
if end_date > df_day['date'].max():
end_date = df_day['date'].max()
# 2. 取到 end_date 为止的所有日K
df_day_cut = df_day[df_day['date'] <= end_date].copy()
# 3. 取 最近 N 个 交易日
if len(df_day_cut) >= box_days:
df_day_box = df_day_cut.tail(box_days)
else:
df_day_box = df_day_cut
start_date = df_day_box['date'].min()
# 4. 用真实交易日区间 筛选 30分钟K线
df_box = df_30[(df_30['date'] >= start_date) & (df_30['date'] <= end_date)]
# 计算箱体成本
total_vol = df_box['cmp_volume'].sum()
if total_vol == 0:
box_cost = 0
else:
box_cost = (df_box['avg'] * df_box['cmp_volume']).sum() / total_vol
self.k_box = df_box
return box_cost
def get_map(self, bins=80):
if self.k_box is None or self.k_box.empty:
raise Exception("先调用 box() 计算箱体")
df_box = self.k_box.copy()
df_box['price_range'] = pd.cut(df_box['avg'], bins=bins)
dist = df_box.groupby('price_range')['cmp_volume'].sum().reset_index()
total = dist['cmp_volume'].sum()
dist['percent'] = dist['cmp_volume'] / total * 100 if total > 0 else 0
# 最简单最稳定的写法,绝对不报错
price_left = []
price_right = []
for interval in dist['price_range']:
l = interval.left
r = interval.right
price_left.append(float(l))
price_right.append(float(r))
dist['price_left'] = price_left
dist['price_right'] = price_right
dist['price_mid'] = (dist['price_left'] + dist['price_right']) / 2
self.box_chip_dist = dist
return dist
def plot_box_chip_top3(c, dist, save_path=None,min_percent=0.01, dpi=300):
"""
1. 过滤占比低于 min_percent 极细小筹码
2. 纵轴固定10刻度,减少毛刺拥挤
3. TOP1红/TOP2橙/TOP3绿高亮
4. 右侧竖表展示过滤后前15名
5. 表格前三名底色与图形配色一致
6. 新增:筹码总占比 + 箱体总成交量
"""
code = c.code
day = c.day
if c.show_type == 0:
show_type = "「堆叠筹码」"
elif c.show_type == 1:
show_type = "「恒定衰减系数K=1」"
elif c.show_type == 2:
show_type = "「动态衰减系数k」"
else:
show_type = "「其他」"
# ========== 过滤极小筹码 ==========
dist = dist[dist['percent'] >= min_percent].reset_index(drop=True)
if len(dist) == 0:
print("过滤后无有效筹码数据,请调低过滤阈值")
return
# ========== 计算总数:筹码总和 + 成交量总和 ==========
total_chip = dist['percent'].sum()
total_volume = c.k_box['volume'].sum()
total_volume_yy = total_volume / 10000
# 画布布局
fig = plt.figure(figsize=(16, 9))
ax1 = plt.subplot2grid((1, 12), (0, 0), colspan=9)
ax2 = plt.subplot2grid((1, 12), (0, 9), colspan=3)
ax2.axis('off')
# 绘制基础蓝色筹码
ax1.barh(
y=dist['price_mid'],
width=dist['percent'],
height=(dist['price_right'] - dist['price_left']) * 0.9,
color='#4285F4',
alpha=0.85
)
# 排序
dist_sorted = dist.sort_values('percent', ascending=False).reset_index(drop=True)
top1 = dist_sorted.iloc[0]
top2 = dist_sorted.iloc[1]
top3 = dist_sorted.iloc[2]
colors = ['#E53935', '#FB8C00', '#43A047']
top_list = [top1, top2, top3]
labels = []
# 覆盖高亮前三
for i, row in enumerate(top_list):
pct = row['percent']
l = row['price_left']
r = row['price_right']
mid = row['price_mid']
ax1.barh(
y=mid,
width=pct,
height=(r - l) * 0.9,
color=colors[i],
alpha=1
)
labels.append(f"TOP{i+1} | {l:.2f}~{r:.2f} | 占比 {pct:.2f}%")
# 纵轴固定10刻度,精简刻度去毛刺
price_min = dist['price_left'].min()
price_max = dist['price_right'].max()
y_ticks = np.linspace(price_min, price_max, 10)
ax1.set_yticks(y_ticks)
ax1.tick_params(axis='y', labelsize=10)
# 成本参考线
box_cost = c.box(day)
box_open = c.k_box.iloc[0]['open']
box_close = c.k_box.iloc[-1]['close']
ax1.axhline(y=box_cost, color='maroon', linestyle='--', linewidth=2)
# ax1.axhline(y=box_open, color='purple', linestyle='--', linewidth=2)
ax1.axhline(y=box_close, color='navy', linestyle='--', linewidth=2)
# 图表样式 + 显示总数信息
ax1.set_xlabel('筹码占比 (%)', fontsize=12)
ax1.set_ylabel('价格', fontsize=12)
ax1.set_title(f"{str(c.code).zfill(6)} | 共{day}天箱体筹码分布 {show_type} | 过滤<{min_percent}%细碎筹码", fontsize=14)
# ========== 显示总数 ==========
ax1.text(0.5, 1.05, f"筹码总占比:{total_chip:.2f}% | 箱体总成交量:{total_volume_yy:.2f} 万股",transform=ax1.transAxes, ha='center', fontsize=12, weight='bold', color='darkred')
ax1.grid(alpha=0.3)
# 图例(已修复:你注释掉开盘价,这里也去掉)
legend_elements = [
Rectangle((0, 0), 1, 1, color=colors[0]),
Rectangle((0, 0), 1, 1, color=colors[1]),
Rectangle((0, 0), 1, 1, color=colors[2]),
Line2D([0], [0], color='maroon', linestyle='--', linewidth=2),
Line2D([0], [0], color='navy', linestyle='--', linewidth=2),
]
legend_labels = [
labels[0], labels[1], labels[2],
f'平均成本: {box_cost:.2f}',
f'箱体收盘价: {box_close:.2f}'
]
ax1.legend(legend_elements, legend_labels, loc='upper right', fontsize=10)
# 右侧前15表格
top15 = dist_sorted.head(15).copy()
top15['排名'] = [f'第{i}名' for i in range(1, len(top15)+1)]
top15['价格区间'] = top15.apply(lambda x: f"{x['price_left']:.2f}~{x['price_right']:.2f}", axis=1)
top15['占比(%)'] = top15['percent'].apply(lambda x: f"{x:.2f}%")
table_data = top15[['排名', '价格区间', '占比(%)']].values.tolist()
# ========== 已修复:定义表格列名 ==========
colLabels = ['排名', '价格区间', '筹码占比']
table = ax2.table(
cellText=table_data,
colLabels=colLabels,
loc='center',
cellLoc='center',
colWidths=[0.2, 0.5, 0.3]
)
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1, 1.7)
# 表格前三行上色对应图形颜色
for idx, color in enumerate(['#ffcccc','#ffe6cc','#ccf2cc']):
if idx < len(table_data):
table[(idx+1, 0)].set_facecolor(color)
table[(idx+1, 1)].set_facecolor(color)
table[(idx+1, 2)].set_facecolor(color)
ax2.text(0.5, 0.97, f'过滤细碎筹码\n前15名(≥{min_percent}%)',ha='center', va='top', fontsize=12, weight='bold')
plt.tight_layout()
plt.subplots_adjust(wspace=0.3)
if save_path:
# 自动创建目录(如果不存在)
from pathlib import Path
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
plt.savefig(save_path, dpi=dpi, bbox_inches='tight', facecolor='white')
print(f"图片已保存至: {save_path}")
plt.show()
k_day=pd.read_csv('cmf_quant4.csv')
k_30=pd.read_csv('cmf_half.csv')
c = chip(k_day, k_30)
c.show_type=0
c.day_cmp()
c.box(300) # 300 个交易日/
dist = c.get_map()
plot_box_chip_top3(c,dist,"a.jpg")