画图 最后

import pandas as pd
import numpy as np
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from matplotlib.lines import Line2D
import time


# 全局配置:解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei', 'WenQuanYi Zen Hei', 'Heiti TC', 'sans-serif']
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题


class chip:
    def __init__(self, k_day, k_30):
        self.k_30 = k_30.copy()
        self.k_day = k_day.copy()
        self.k_30_result = None
        self.avg_price = 0
        self.k_box = None
        self.box_chip_dist = None
        self.show_type=0
        self.day=0
        self.code=k_day.iloc[0]['mcode']

        # 日期格式化
        self.k_30['date'] = pd.to_datetime(self.k_30['date'])
        self.k_30['datetime'] = pd.to_datetime(self.k_30['datetime'])
        self.k_day['date'] = pd.to_datetime(self.k_day['date'])

        self.k_day['turn'] = self.k_day['turn'] / 100

        # if self.k_day['turn'].median() > 1:
        #     self.k_day['turn'] = self.k_day['turn'] / 100
        #     print(f"换手率已转换:百分比 → 小数")
        # else:
        #     print(f"换手率已是小数格式,无需转换")

        # 排序
        self.k_30 = self.k_30.sort_values('datetime', ascending=True).reset_index(drop=True)
        self.k_day = self.k_day.sort_values('date', ascending=True).reset_index(drop=True)

    def day_cmp(self):
        s_date = self.k_30.iloc[0]['date']

        self.k_day_copy = self.k_day.loc[self.k_day['date'] >= s_date].copy()
        self.k_day_copy['today'] = self.k_day_copy['amount'] / self.k_day_copy['volume']
        self.k_day_copy['yesterday'] = self.k_day_copy['amount'].shift(1) / self.k_day_copy['volume'].shift(1)
        #0.002判断 即0.2%以内横盘
        if self.show_type==2:
            self.k_day_copy['k'] = 0.8
            # self.k_day_copy.loc[self.k_day_copy['today']<self.k_day_copy['yesterday'], 'k'] = 0.65
            # self.k_day_copy.loc[self.k_day_copy['today']>self.k_day_copy['yesterday'], 'k'] = 0.95
            self.k_day_copy.loc[(self.k_day_copy['today']-self.k_day_copy['yesterday'])/self.k_day_copy['yesterday']< -0.002, 'k'] = 0.65
            self.k_day_copy.loc[(self.k_day_copy['today']-self.k_day_copy['yesterday'])/self.k_day_copy['yesterday']> 0.002, 'k'] = 0.95
        elif self.show_type==1:
            self.k_day_copy['k']=1
        else:
            self.k_day_copy['k']=1

        turn_arr = self.k_day_copy['turn'].values
        k_arr = self.k_day_copy['k'].values
        if self.show_type>0:
            factor_arr = 1 - turn_arr * k_arr
        else:
            factor_arr = 1- turn_arr * k_arr*0

        n = len(factor_arr)
        k_mat = np.tile(factor_arr, (n, 1))
        mask = np.triu(np.ones((n, n), dtype=bool), k=1)
        k_mat = np.where(mask, k_mat, 1.0)
        mat = np.cumprod(k_mat, axis=1)
        final_retention = mat[:, -1]

        self.k_day_copy['cmp'] = final_retention
        self.merge_day_cmp_to_30m()

        self.k_30_result['cmp_volume'] = self.k_30_result['cmp'] * self.k_30_result['volume']
        self.k_30_result['avg'] = self.k_30_result['amount'] / self.k_30_result['volume']
        self.k_30_result['cmp_amount'] = self.k_30_result['avg'] * self.k_30_result['cmp_volume']

        total_vol = self.k_30_result['cmp_volume'].sum()
        total_amt = self.k_30_result['cmp_amount'].sum()

        self.avg_price = total_amt / total_vol if total_vol > 0 else 0

    def merge_day_cmp_to_30m(self):
        day_cmp = self.k_day_copy[['date', 'cmp']].copy()
        self.k_30_result = self.k_30.copy()
        self.k_30_result = self.k_30_result.merge(day_cmp, on='date', how='left')

    # ====================== 【重要修改】交易日箱体 ======================
    def box(self, box_days=120, end_date=None):
        """
        按 交易日 切分箱体(正确版)
        """
        self.day=box_days
        df_day = self.k_day.copy()
        df_30 = self.k_30_result.copy()

        # 1. 确定结束日期
        if end_date is None:
            end_date = df_day['date'].max()
        else:
            end_date = pd.to_datetime(end_date)

        if end_date > df_day['date'].max():
            end_date = df_day['date'].max()

        # 2. 取到 end_date 为止的所有日K
        df_day_cut = df_day[df_day['date'] <= end_date].copy()

        # 3. 取 最近 N 个 交易日
        if len(df_day_cut) >= box_days:
            df_day_box = df_day_cut.tail(box_days)
        else:
            df_day_box = df_day_cut

        start_date = df_day_box['date'].min()

        # 4. 用真实交易日区间 筛选 30分钟K线
        df_box = df_30[(df_30['date'] >= start_date) & (df_30['date'] <= end_date)]

        # 计算箱体成本
        total_vol = df_box['cmp_volume'].sum()
        if total_vol == 0:
            box_cost = 0
        else:
            box_cost = (df_box['avg'] * df_box['cmp_volume']).sum() / total_vol

        self.k_box = df_box
        return box_cost

    def get_map(self, bins=80):
        if self.k_box is None or self.k_box.empty:
            raise Exception("先调用 box() 计算箱体")

        df_box = self.k_box.copy()
        df_box['price_range'] = pd.cut(df_box['avg'], bins=bins)
        dist = df_box.groupby('price_range')['cmp_volume'].sum().reset_index()
        total = dist['cmp_volume'].sum()
        dist['percent'] = dist['cmp_volume'] / total * 100 if total > 0 else 0

        # 最简单最稳定的写法,绝对不报错
        price_left = []
        price_right = []
        for interval in dist['price_range']:
            l = interval.left
            r = interval.right
            price_left.append(float(l))
            price_right.append(float(r))

        dist['price_left'] = price_left
        dist['price_right'] = price_right
        dist['price_mid'] = (dist['price_left'] + dist['price_right']) / 2

        self.box_chip_dist = dist
        return dist


def plot_box_chip_top3(c, dist, save_path=None,min_percent=0.01, dpi=300):
    """
    1. 过滤占比低于 min_percent 极细小筹码
    2. 纵轴固定10刻度,减少毛刺拥挤
    3. TOP1红/TOP2橙/TOP3绿高亮
    4. 右侧竖表展示过滤后前15名
    5. 表格前三名底色与图形配色一致
    6. 新增:筹码总占比 + 箱体总成交量
    """
    code = c.code
    day = c.day

    if c.show_type == 0:
        show_type = "「堆叠筹码」"
    elif c.show_type == 1:
        show_type = "「恒定衰减系数K=1」"
    elif c.show_type == 2:
        show_type = "「动态衰减系数k」"
    else:
        show_type = "「其他」"

    # ========== 过滤极小筹码 ==========
    dist = dist[dist['percent'] >= min_percent].reset_index(drop=True)
    if len(dist) == 0:
        print("过滤后无有效筹码数据,请调低过滤阈值")
        return

    # ========== 计算总数:筹码总和 + 成交量总和 ==========
    total_chip = dist['percent'].sum()
    total_volume = c.k_box['volume'].sum()
    total_volume_yy = total_volume / 10000

    # 画布布局
    fig = plt.figure(figsize=(16, 9))
    ax1 = plt.subplot2grid((1, 12), (0, 0), colspan=9)
    ax2 = plt.subplot2grid((1, 12), (0, 9), colspan=3)
    ax2.axis('off')

    # 绘制基础蓝色筹码
    ax1.barh(
        y=dist['price_mid'],
        width=dist['percent'],
        height=(dist['price_right'] - dist['price_left']) * 0.9,
        color='#4285F4',
        alpha=0.85
    )

    # 排序
    dist_sorted = dist.sort_values('percent', ascending=False).reset_index(drop=True)
    top1 = dist_sorted.iloc[0]
    top2 = dist_sorted.iloc[1]
    top3 = dist_sorted.iloc[2]

    colors = ['#E53935', '#FB8C00', '#43A047']
    top_list = [top1, top2, top3]
    labels = []

    # 覆盖高亮前三
    for i, row in enumerate(top_list):
        pct = row['percent']
        l = row['price_left']
        r = row['price_right']
        mid = row['price_mid']
        ax1.barh(
            y=mid,
            width=pct,
            height=(r - l) * 0.9,
            color=colors[i],
            alpha=1
        )
        labels.append(f"TOP{i+1} | {l:.2f}~{r:.2f} | 占比 {pct:.2f}%")

    # 纵轴固定10刻度,精简刻度去毛刺
    price_min = dist['price_left'].min()
    price_max = dist['price_right'].max()
    y_ticks = np.linspace(price_min, price_max, 10)
    ax1.set_yticks(y_ticks)
    ax1.tick_params(axis='y', labelsize=10)

    # 成本参考线
    box_cost = c.box(day)
    box_open = c.k_box.iloc[0]['open']
    box_close = c.k_box.iloc[-1]['close']

    ax1.axhline(y=box_cost, color='maroon', linestyle='--', linewidth=2)
    # ax1.axhline(y=box_open, color='purple', linestyle='--', linewidth=2)
    ax1.axhline(y=box_close, color='navy', linestyle='--', linewidth=2)

    # 图表样式 + 显示总数信息
    ax1.set_xlabel('筹码占比 (%)', fontsize=12)
    ax1.set_ylabel('价格', fontsize=12)
    ax1.set_title(f"{str(c.code).zfill(6)} | 共{day}天箱体筹码分布 {show_type} | 过滤<{min_percent}%细碎筹码", fontsize=14)
    
    
    # ========== 显示总数 ==========
    ax1.text(0.5, 1.05, f"筹码总占比:{total_chip:.2f}% | 箱体总成交量:{total_volume_yy:.2f} 万股",transform=ax1.transAxes, ha='center', fontsize=12, weight='bold', color='darkred')
    
    ax1.grid(alpha=0.3)

    # 图例(已修复:你注释掉开盘价,这里也去掉)
    legend_elements = [
        Rectangle((0, 0), 1, 1, color=colors[0]),
        Rectangle((0, 0), 1, 1, color=colors[1]),
        Rectangle((0, 0), 1, 1, color=colors[2]),
        Line2D([0], [0], color='maroon', linestyle='--', linewidth=2),
        Line2D([0], [0], color='navy', linestyle='--', linewidth=2),
    ]
    legend_labels = [
        labels[0], labels[1], labels[2],
        f'平均成本: {box_cost:.2f}',
        f'箱体收盘价: {box_close:.2f}'
    ]
    ax1.legend(legend_elements, legend_labels, loc='upper right', fontsize=10)

    # 右侧前15表格
    top15 = dist_sorted.head(15).copy()
    top15['排名'] = [f'第{i}名' for i in range(1, len(top15)+1)]
    top15['价格区间'] = top15.apply(lambda x: f"{x['price_left']:.2f}~{x['price_right']:.2f}", axis=1)
    top15['占比(%)'] = top15['percent'].apply(lambda x: f"{x:.2f}%")

    table_data = top15[['排名', '价格区间', '占比(%)']].values.tolist()
    
    # ========== 已修复:定义表格列名 ==========
    colLabels = ['排名', '价格区间', '筹码占比']

    table = ax2.table(
        cellText=table_data,
        colLabels=colLabels,
        loc='center',
        cellLoc='center',
        colWidths=[0.2, 0.5, 0.3]
    )
    table.auto_set_font_size(False)
    table.set_fontsize(9)
    table.scale(1, 1.7)

    # 表格前三行上色对应图形颜色
    for idx, color in enumerate(['#ffcccc','#ffe6cc','#ccf2cc']):
        if idx < len(table_data):
            table[(idx+1, 0)].set_facecolor(color)
            table[(idx+1, 1)].set_facecolor(color)
            table[(idx+1, 2)].set_facecolor(color)

    ax2.text(0.5, 0.97, f'过滤细碎筹码\n前15名(≥{min_percent}%)',ha='center', va='top', fontsize=12, weight='bold')

    plt.tight_layout()
    plt.subplots_adjust(wspace=0.3)

    if save_path:
        # 自动创建目录(如果不存在)
        from pathlib import Path
        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
        
        plt.savefig(save_path, dpi=dpi, bbox_inches='tight', facecolor='white')
        print(f"图片已保存至: {save_path}")
    plt.show()





k_day=pd.read_csv('cmf_quant4.csv')
k_30=pd.read_csv('cmf_half.csv')

c = chip(k_day, k_30)
c.show_type=0
c.day_cmp()
c.box(300)       # 300 个交易日/
dist = c.get_map()
plot_box_chip_top3(c,dist,"a.jpg")


此条目发表在None分类目录。将固定链接加入收藏夹。

发表回复