thinkcmf thinkphp6 原生JSON

code 的值 1 为success 值0为error

{
    "code": 0,
    "msg": "保存成功",
    "url": "/user/index",
    "data": {
        "name": "test"
    }
}

return $this->success(‘操作成功’, ”, $data);

文字、 URL 、数据;

发表在 None | 留下评论

acme.sh 添加SSL

首先LNMP下的SSL添加有问题 不如直接使用acme.sh反正也是用他的

安装:

curl https://get.acme.sh | sh -s email=your@email.com
#需要设置你的邮箱
source ~/.bashrc

acme.sh -v
#查看安装情况

使用letsencrypt

acme.sh --set-default-ca --server letsencrypt

修改你的nginx的conf

设置:.well-know (lnmp 里面的需要修改) 请注释掉之前的/.well-known/然后插入新的

location ^~ /.well-known/acme-challenge/ {
            allow all;
            default_type text/plain;
            root /home/wwwroot/quant.comic.org.cn;
        }

然后 lnmp nginx restart 实现nginx重启

acme.sh --issue -d abc.cn -w /home/wwwroot/abc.cn

安装好了会显示地址:

[Sun Apr 12 04:54:22 PM CST 2026] Your cert is in: /root/.acme.sh/abc.cn_ecc/abc.cn.cer
[Sun Apr 12 04:54:22 PM CST 2026] Your cert key is in: /root/.acme.sh/abc.cn_ecc/abc.cn.key
[Sun Apr 12 04:54:22 PM CST 2026] The intermediate CA cert is in: /root/.acme.sh/abc.cn_ecc/ca.cer
[Sun Apr 12 04:54:22 PM CST 2026] And the full-chain cert is in: /root/.acme.sh/abc.cn_ecc/fullchain.cer

#其中abc.cn是你的证书文件位置 在root的.acme.sh下面

nginx下创建ssl目录

mkdir /usr/local/nginx/conf/ssl

一键移动证书到NGINX

acme.sh --install-cert -d abc.cn \
--key-file /usr/local/nginx/conf/ssl/abc.cn.key \
--fullchain-file /usr/local/nginx/conf/ssl/abc.cn.crt \
--reloadcmd "nginx -t && service nginx reload"

重新配置conf 在lnmp的域名下配置

server
{
    listen 443 ssl;
    listen [::]:443 ssl;
    server_name abc.cn;

    root  /home/wwwroot/abc.cn/public;
    index index.html index.htm index.php default.html default.htm default.php;

    # SSL 证书位置
    ssl_certificate /usr/local/nginx/conf/ssl/abc.cn.crt;
    ssl_certificate_key /usr/local/nginx/conf/ssl/abc.cn.key;

    include rewrite/thinkphp.conf;
    include enable-php-pathinfo.conf;

    location ~ .*\.(gif|jpg|jpeg,png,bmp,swf)$
    {
        expires      30d;
    }

    location ~ .*\.(js,css)?$
    {
        expires      12h;
    }

    location ~ /\.
    {
        deny all;
    }

    access_log  /home/wwwlogs/abc.cn.log;
}

配置443端口

ufw allow 443

查看443 端口

netstat -lntp | grep 443

腾讯云 阿里云的话可能要在云服务器配置中打开443 端口

发表在 None | 留下评论

Thinkphp6下两表多个键相互关联

A 表 date mcode

B表 old_date mcode

上面两个表相互关联 A一对多B

Thinkphp6 下需要写成【大道至简 使用where 和$this->键】


public function today()
{
    return $this->hasMany(QuantnModel::class)
        ->where('mcode', $this->mcode)
        ->where('old_date', $this->date);
}

添加索引:

ALTER TABLE cmf_quantn ADD INDEX idx_mcode_old_date (mcode, old_date);

发表在 None | 留下评论

Mysql索引太好用了

以前数据库计量小,几万条顶天了,最近在股票日线,差不多要160万条的数据没有索引,慢得不得了,差不多vps查个k线要2秒起步 。我一直以为是VPS性能太差,结果加上索引就到几百ms完成。

适合查大量的数据 按照日期排列

添加一个索引 mcode 和date

ALTER TABLE cmf_quant ADD INDEX idx_mcode_date (mcode, date);

添加索引并按照分数排列

ALTER TABLE kline_day
ADD INDEX idx_date_score (trade_date, score DESC);

查看当前索引

SHOW INDEX FROM cmf_quant;

删除索引

DROP INDEX 索引名 ON 表名;

DROP INDEX idx_mcode_date ON cmf_quant;

唯一索引:UNIQUE 关键词

ALTER TABLE cmf_quant ADD UNIQUE INDEX uk_code_date (code, date); 

发表在 None | 留下评论

one tquant

one

import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import copy
import warnings
warnings.filterwarnings("ignore")  # 关掉烦人的警告

# 数据库配置
config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "123456",
    "database": "db",
    "charset": "utf8mb4"
}

conn = pymysql.connect(**config)
cursor = conn.cursor()



def get_info(mcode):
    sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
    df = pd.read_sql(sql, conn, params=(mcode,))
    return df


# ===================== ✅ 极速插入:INSERT IGNORE =====================


def df_to_datalist(df):
    insert_list = []
    for _, row in df.iterrows():
        data = (
            row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
            row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
            row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
            row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
        )
        insert_list.append(data)
    return insert_list

def insert_data(data_list):
    sql = """
        INSERT IGNORE INTO `cmf_quantn` 
        ( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
        , `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
        , `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
        , `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
        ) 
        VALUES 
        (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ) """
    
    # 直接批量插入,重复自动跳过!
    cursor.executemany(sql, data_list)
    conn.commit()

    # affected rows 就是插入成功的数量
    insert_count = cursor.rowcount
    skip_count = len(data_list) - insert_count
    return insert_count, skip_count

def insert_datan(data_list):
    sql = """
        INSERT IGNORE INTO `cmf_quantn` 
        ( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
        , `turn`, `isST`, `adjustflag`,`is_now`
        ) 
        VALUES 
        (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s
        ) """
    
    # 直接批量插入,重复自动跳过!
    cursor.executemany(sql, data_list)
    conn.commit()

    # affected rows 就是插入成功的数量
    insert_count = cursor.rowcount
    skip_count = len(data_list) - insert_count
    return insert_count, skip_count



def do_coden(exchange, code):
    start = time.time()
    dlx = gp.get_today(exchange, code)
    # print(dlx)
    if dlx:
        row=dlx[0]
        dx=[
            row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
            row['turn'], row['isST'], row['adjustflag'],int(time.time())
        ]
        # print(dx)
        insert_count, skip_count = insert_datan([dx])
        
        print(f"股票 {code} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")

def insert_all(info,code_str):
    start=time.time()
    if info:
        dx=[]
        for m in info:
            row=m[0]
            cv=copy.deepcopy(row)
            cv['mcode']=row['code'].strip()[4:10]
            cv['code']=row['code'].strip()[2:4]+"."+row['code'].strip()[4:10]
            df_b=pd.DataFrame([cv])
            df_a=get_info(cv['mcode'])
            df=pd.concat([df_a,df_b],ignore_index=True)
            df=gp.quant(df)
            df = df.replace([float('inf'), -float('inf')], 0)
            df = df.fillna(0)

            data_list = df_to_datalist(df)
            insert_count, skip_count = insert_data(data_list[-1:])
    
            print(insert_count, skip_count)

            dx.append([
                row['code'].strip()[4:10], row['code'].strip()[2:4]+"."+row['code'].strip()[4:10], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
                row['turn'], row['isST'], row['adjustflag'],int(time.time())
            ])
        insert_count, skip_count = insert_datan(dx)
        print(f"股票 {code_str} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")

gp=GP(False)

# # info=gp.get_today("sh", "600000")
# # print(info)
# do_coden("sh", "600000")

df = pd.read_csv('all.csv', dtype={'code': str})
i=0
code_str=''
for index, row in df.iterrows():
    c=time.time()
    if i<25:
        code_str=row['exchange']+row['code']+","+code_str
        i=i+1
    else:
        print(code_str)
        ccc=gp.gp_all(code_str)
        i=0
        code_str=''
        insert_all(ccc,code_str)
        x=4-(time.time()-c)
        if x>0:
            time.sleep(x)
ccc=gp.gp_all(code_str)
insert_all(ccc,code_str)

tquant

import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import json
from rules import rules
import warnings
warnings.filterwarnings("ignore")  # 关掉烦人的警告

gp=GP(False)
# 数据库配置
config = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "123456",
    "database": "db",
    "charset": "utf8mb4"
}

conn = pymysql.connect(**config)
cursor = conn.cursor()

def df_to_datalist(df):
    insert_list = []
    for _, row in df.iterrows():
        data = (
            row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
            row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
            row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
            row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
        )
        insert_list.append(data)
    return insert_list

def insert_data(data_list):
    sql = """
        INSERT IGNORE INTO `cmf_quantn` 
        ( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
        , `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
        , `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
        , `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
        ) 
        VALUES 
        (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
        ) """
    
    # 直接批量插入,重复自动跳过!
    cursor.executemany(sql, data_list)
    conn.commit()

    # affected rows 就是插入成功的数量
    insert_count = cursor.rowcount
    skip_count = len(data_list) - insert_count
    return insert_count, skip_count



def get_info(mcode):
    sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
    df = pd.read_sql(sql, conn, params=(mcode,))
    return df

def get_info_t(mcode):
    # 读取腾讯最后一行
    sql = "SELECT * FROM cmf_quantn WHERE mcode = %s ORDER BY date DESC LIMIT 1"
    df = pd.read_sql(sql, conn, params=(mcode,))
    return df

def get_score(df):
    if len(df)>90:
        score=0
        date=df.iloc[-1]['date']
        score_text=[]
        code=df.iloc[-1]['code']
        for ru in rules:
            if ru['check'](df):
                score=score+ru['score']
                score_text.append(ru['name'])
        return [
            df.iloc[-1]['id'],
            code,
            date,
            score,
            score_text,
        ]
    return False

def update_score(info):
    if info:
                # 1. 创建连接
        # conn = pymysql.connect(**config)
        # cursor = conn.cursor()
        try:
            sql="UPDATE cmf_quantn SET score=%s,score_text=%s  WHERE id=%s"
            cursor.execute(sql, [info[3],json.dumps(info[4],ensure_ascii=False),info[0]])
            conn.commit()
            return True
        except Exception as e:
            conn.rollback()  # 失败自动回滚
            return False
        # finally:
        #     cursor.close()
        #     conn.close()
    return False

dfc = pd.read_csv('all.csv', dtype={'code': str})
for index, row in dfc.iterrows():
    code=row['code']
    c=time.time()
    df=get_info(code)
    df_t=get_info_t(code)
    # 腾讯
    if (not df.empty) and (not df_t.empty):
        dfc=pd.concat([df,df_t],ignore_index=True)
        num=len(dfc)
        if num>90:
            # #最新评分
            dfc = gp.quant(dfc)
            # print(dfc.iloc[-1])
            # break
            score_list=get_score(dfc)
            print(score_list)
            if score_list:
                update_score(score_list)
                data_list = df_to_datalist(df)
                insert_count, skip_count = insert_data(data_list[-1:])   
                print(code,insert_count, skip_count)
            break
    # print(score_list[1],time.time()-c)
            # #全量评分
            # while len(df) >90:
            #     score_list=get_score(df)
            #     if score_list:
            #         update_score(score_list)
            #     df=df.iloc[:-1]
            # print(time.time()-c)
cursor.close()
conn.close()
发表在 None | 留下评论