代码

import pandas as pd
import numpy as np
import baostock as bs
import requests
from datetime import datetime, timedelta
import time
# import talib as ta 
#使用quant()函数替换

class GP:
    def __init__(self):
        self.login()
    def login(self):
        bs.login()
    def logout(self):
        bs.logout()
    def get_history(self,gp_type,code,end_date=False,start_date=False,days=14):
        if not end_date:
            end_date=datetime.now().strftime("%Y-%m-%d")
        if not start_date:
            start_date=(datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days)).strftime("%Y-%m-%d")
        rs = bs.query_history_k_data_plus(
            code=gp_type+"."+str(code),         
            fields="code,date,open,high,low,close,volume,adjustflag",  # 要获取的字段(手动指定,Akshare 自动返回全字段)
            start_date=start_date,   # 日期格式:必须带横杠(Akshare 可无)
            end_date=end_date,
            frequency="d",             # 周期:d=日线(和 Akshare 的 period="daily" 对应)
            adjustflag="2"             # 复权:2=前复权(对应 Akshare 的 adjust="qfq")
        )
        # 3. 转为DataFrame(核心步骤)
        df = rs.get_data()
        df = df.replace("", 0)  # 把所有空字符串变成 0
        # 4. 数据类型转换(避免数值以字符串显示)
        df = df.astype({
            "code":str,
            "open": float, 
            "high": float, 
            "low": float,
            "close": float, 
            "volume": float,
            "adjustflag": int  # 复权标识转整数
        })
        return df
    #股票代码数据
    def gp_code(self,code):
        str_code=str(code)
        if str_code[0] in ['0','1','2','3']:
            gp_type='sz'
        elif str_code[0] in ['5','6','9']:
            gp_type='sh'
        elif str_code[0] in ['4','8']:
            gp_type='bj'
        else:
            return False
        return [gp_type,str_code]
    #当日股票
    def get_today(self,gp_type,code):
        str_code=str(code)
        url="https://qt.gtimg.cn/q="
        url=url+gp_type+str_code
        headers = {
            'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
        }
        res=requests.get(url,headers=headers)
        if res.status_code==200:
            if len(res.content)>30:
                reb = res.content.decode("gbk")
                reb = reb.replace("~~~", "~").replace("~~", "~")
                vsx = reb.split('~')
                return [
                            {
                                "code":gp_type+"."+str_code,
                                "date":datetime.now().strftime("%Y-%m-%d"),
                                "open": float(vsx[5]), 
                                "high": float(vsx[32]), 
                                "low": float(vsx[33]),
                                "close": float(vsx[3]), 
                                "volume": float(vsx[6])*100,
                                "adjustflag": 2  # 复权标识转整数 统一baostock
                            },
                            {
                                "name":vsx[1],
                                'money':vsx[42]
                            }
                        ]
        return False


    def gupiao(self,gp_type,gp_code,days=180):
        #获得最近180天的交易 包括今天 解决baostock 没有今日数据
        ma=self.get_today(gp_type,gp_code)
        if ma:
            if int(ma[0]['volume'])  != 0:
                df=self.get_history(gp_type,gp_code,days=days)
                df = pd.concat([df, pd.DataFrame([ma[0]])], ignore_index=True)
                return df
        return False

    def quant(self,df):
        #处理量化指标
        #VOL MA
        df['vol_ma5']=df["volume"].rolling(window=5).mean()
        df['vol_ma10']=df["volume"].rolling(window=10).mean()
        df['vol_ma20']=df["volume"].rolling(window=20).mean()
        df['vol_ma60']=df["volume"].rolling(window=60).mean()
        #MA
        df['ma5']=df["close"].rolling(window=5).mean()
        df['ma10']=df["close"].rolling(window=10).mean()
        df['ma20']=df["close"].rolling(window=20).mean()
        df['ma60']=df["close"].rolling(window=60).mean()
        #ATR
        df['hl']=df['high']-df['low']
        df['hc']=abs(df['high']-df['close'].shift(1))
        df['lc']=abs(df['low']-df['close'].shift(1))
        df['tr']=df[['hl','hc','lc']].abs().max(axis=1) #axis=1行计算
        df['atr'] = df['tr'].ewm(span=14, adjust=False).mean()
        #RSI 14 6
        df['delta']=df['close'].diff()
        df['gain']=df['delta'].where(df['delta']>0,0)
        df['loss']=-df['delta'].where(df['delta']<0,0)
        df['avg_gain(14)']=df['gain'].rolling(14).mean()
        df['avg_loss(14)']=df['loss'].rolling(14).mean()
        df['avg_loss(14)'] = df['avg_loss(14)'].replace(0, 1e-6)
        df['rs(14)']=df['avg_gain(14)']/df['avg_loss(14)']
        df['rsi(14)']=100-(100/(1+df['rs(14)']))
        df['avg_gain(6)']=df['gain'].rolling(6).mean()
        df['avg_loss(6)']=df['loss'].rolling(6).mean()
        df['avg_loss(6)'] = df['avg_loss(6)'].replace(0, 1e-6)
        df['rs(6)']=df['avg_gain(6)']/df['avg_loss(6)']
        df['rsi(6)']=100-(100/(1+df['rs(6)']))
        #成交 MA5
        df['money_ma5']=100*df['ma5']*df['vol_ma5']
        #阳 1 阴 0
        df['status']=np.where((df['close']-df['open'])>0,1,0)
        return df







#1 10收盘价接近10日最高点  10
# df.iloc[-1]['close']>(8*(df['high'].iloc[-10:].max()-df['high'].iloc[-10:].min())/10+df['high'].iloc[-10:].min())
# #2 收盘价大于MA5 10
# df.iloc[-1]['close']>df.iloc[-1]['ma5']
# #3 今日MA5>昨日MA5 10
# df.iloc[-1]['ma5']>df.iloc[-2]['ma5']
# #4 今日ATR在最近20个交易日内ATR排序 10
# atrr=(df['atr'].iloc[-20:]<df['atr'].iloc[-1]).sum()/20
# atrr>0.2 and atrr<0.8
# #5 平均成交额大于2亿 10
# df.iloc[-1]['money_ma5']>200000000
# #6 RSI 6小于70 10
# df.iloc[-1]['rsi(6)']<70
# df.iloc[-1]['rsi(6)']>30
# #7 收盘价大于MA20 10
# df.iloc[-1]['close']>df.iloc[-1]['ma20']
# #8 今日MA20>昨日MA20 10
# df.iloc[-1]['ma20']>df.iloc[-2]['ma20']
# #9 20日新高点 10
# df.iloc[-1]['high'] > df['high'].iloc[-20:-1].max() 
# #10 量价指标 -100 -10 5 15
# p=df.iloc[-1]['close']-df.iloc[-2]['close']
# v=df.iloc[-1]['volume']/df['volume'].iloc[-20:].mean()

#p>0 2>v>1.5 +15
#p>0 v>1 +5
#p>0 v<1  -10 缩量
#p<0 v>2 -100 恐慌抛售
#p<=0 v<1  0 横盘


def score_def(df):
    score=0
    score_list=[]
    atrr=(df['atr'].iloc[-20:]<df['atr'].iloc[-1]).sum()/20
    p=df.iloc[-1]['close']-df.iloc[-2]['close']
    v=df.iloc[-1]['volume']/df['volume'].iloc[-20:].mean()
    if df.iloc[-1]['close']>(8*(df['high'].iloc[-10:].max()-df['high'].iloc[-10:].min())/10+df['high'].iloc[-10:].min()):
        score=score+10
        score_list.append("收盘价接近10日最高点 +10")
    if df.iloc[-1]['close']>df.iloc[-1]['ma5']:
        score=score+10
        score_list.append("收盘价大于MA5 +10")
    if df.iloc[-1]['ma5']>df.iloc[-2]['ma5']:
        score=score+10
        score_list.append("今日MA5>昨日MA5 +10")
    if atrr>0.2 and atrr<0.8:
        score=score+10
        score_list.append("ATR在20日区间位置位于中部 +10")
    if df.iloc[-1]['money_ma5']>200000000:
        score=score+10
        score_list.append("平均成交额大于2亿 +10")
    if df.iloc[-1]['rsi(6)']<70 and df.iloc[-1]['rsi(6)']>30:
        score=score+10
        score_list.append("RSI6小于70大于30 +10")
    if df.iloc[-1]['close']>df.iloc[-1]['ma20']:
        score=score+10
        score_list.append("收盘价大于MA20 +10")
    if df.iloc[-1]['ma20']>df.iloc[-2]['ma20']:
        score=score+10
        score_list.append("今日MA20>昨日MA20 +10")
    if df.iloc[-1]['high'] > df['high'].iloc[-20:-1].max():
        score=score+10
        score_list.append("20日新高点 +10")
    if p > 0:  # 上涨
        if v > 1.5:
            score += 15
            score_list.append("健康上涨 +15")
        elif v >= 1:      # 1 <= v <= 1.5
            score += 5
            score_list.append("温和放量 +5")
        else:             # v < 1
            score -= 10
            score_list.append("缩量背离 -10")
    else:  # p <= 0 下跌或平盘
        if v >= 2:
            score -= 20
            score_list.append("恐慌下跌 -20")
        elif v >= 1:      # 1 <= v < 2
            score -= 10
            score_list.append("放量下跌 -10")
        else:             # v < 1
            score += 0
            score_list.append("缩量整理 0")
    return [score,score_list,df.iloc[-1].to_dict(),df.iloc[-5:].to_dict('records')]





gp=GP()
co=gp.gp_code('601515')
df=gp.gupiao(co[0],co[1],days=60)
gp.logout()

df=gp.quant(df)
for i in range(10):
    print(score_def(df))
    df=df.drop(df.index[-1])


from gp import GP
import pandas as pd
import time

gp=GP()
df=pd.read_csv('all.csv',dtype={'code':str})

for index,row in  df.iterrows():
    c=time.time()
    df=gp.gupiao(row['exchange'],row['code'],days=60)
    if df:
        df.to_csv("csv/"+row['code']+".csv")
        print("已完成",row['exchange'],row['code'],"耗时",time.time()-c)
    time.sleep(1)

csv文件循环

import numpy as np
import pandas as pd
from pathlib import Path
from gp import GP,score_def

csv_dir = Path("./csv")
gp=GP()

csv_files=list(csv_dir.glob("*.csv"))
for csv_one in csv_files:
    df=pd.read_csv(csv_one)
    df=gp.quant(df)
    vv=score_def(df)
#vv[0] 分数 
此条目发表在None分类目录。将固定链接加入收藏夹。

发表回复