code 的值 1 为success 值0为error
{
"code": 0,
"msg": "保存成功",
"url": "/user/index",
"data": {
"name": "test"
}
}
return $this->success(‘操作成功’, ”, $data);
文字、 URL 、数据;
code 的值 1 为success 值0为error
{
"code": 0,
"msg": "保存成功",
"url": "/user/index",
"data": {
"name": "test"
}
}
return $this->success(‘操作成功’, ”, $data);
文字、 URL 、数据;
首先LNMP下的SSL添加有问题 不如直接使用acme.sh反正也是用他的
安装:
curl https://get.acme.sh | sh -s email=your@email.com
#需要设置你的邮箱
source ~/.bashrc
acme.sh -v
#查看安装情况
使用letsencrypt
acme.sh --set-default-ca --server letsencrypt
修改你的nginx的conf
设置:.well-know (lnmp 里面的需要修改) 请注释掉之前的/.well-known/然后插入新的
location ^~ /.well-known/acme-challenge/ {
allow all;
default_type text/plain;
root /home/wwwroot/quant.comic.org.cn;
}
然后 lnmp nginx restart 实现nginx重启
acme.sh --issue -d abc.cn -w /home/wwwroot/abc.cn
安装好了会显示地址:
[Sun Apr 12 04:54:22 PM CST 2026] Your cert is in: /root/.acme.sh/abc.cn_ecc/abc.cn.cer
[Sun Apr 12 04:54:22 PM CST 2026] Your cert key is in: /root/.acme.sh/abc.cn_ecc/abc.cn.key
[Sun Apr 12 04:54:22 PM CST 2026] The intermediate CA cert is in: /root/.acme.sh/abc.cn_ecc/ca.cer
[Sun Apr 12 04:54:22 PM CST 2026] And the full-chain cert is in: /root/.acme.sh/abc.cn_ecc/fullchain.cer
#其中abc.cn是你的证书文件位置 在root的.acme.sh下面
nginx下创建ssl目录
mkdir /usr/local/nginx/conf/ssl
一键移动证书到NGINX
acme.sh --install-cert -d abc.cn \
--key-file /usr/local/nginx/conf/ssl/abc.cn.key \
--fullchain-file /usr/local/nginx/conf/ssl/abc.cn.crt \
--reloadcmd "nginx -t && service nginx reload"
重新配置conf 在lnmp的域名下配置
server
{
listen 443 ssl;
listen [::]:443 ssl;
server_name abc.cn;
root /home/wwwroot/abc.cn/public;
index index.html index.htm index.php default.html default.htm default.php;
# SSL 证书位置
ssl_certificate /usr/local/nginx/conf/ssl/abc.cn.crt;
ssl_certificate_key /usr/local/nginx/conf/ssl/abc.cn.key;
include rewrite/thinkphp.conf;
include enable-php-pathinfo.conf;
location ~ .*\.(gif|jpg|jpeg,png,bmp,swf)$
{
expires 30d;
}
location ~ .*\.(js,css)?$
{
expires 12h;
}
location ~ /\.
{
deny all;
}
access_log /home/wwwlogs/abc.cn.log;
}
配置443端口
ufw allow 443
查看443 端口
netstat -lntp | grep 443
腾讯云 阿里云的话可能要在云服务器配置中打开443 端口
A 表 date mcode
B表 old_date mcode
上面两个表相互关联 A一对多B
Thinkphp6 下需要写成【大道至简 使用where 和$this->键】
public function today()
{
return $this->hasMany(QuantnModel::class)
->where('mcode', $this->mcode)
->where('old_date', $this->date);
}
添加索引:
ALTER TABLE cmf_quantn ADD INDEX idx_mcode_old_date (mcode, old_date);
以前数据库计量小,几万条顶天了,最近在股票日线,差不多要160万条的数据没有索引,慢得不得了,差不多vps查个k线要2秒起步 。我一直以为是VPS性能太差,结果加上索引就到几百ms完成。
适合查大量的数据 按照日期排列
添加一个索引 mcode 和date
ALTER TABLE cmf_quant ADD INDEX idx_mcode_date (mcode, date);
添加索引并按照分数排列
ALTER TABLE kline_day
ADD INDEX idx_date_score (trade_date, score DESC);
查看当前索引
SHOW INDEX FROM cmf_quant;
删除索引
DROP INDEX 索引名 ON 表名;
DROP INDEX idx_mcode_date ON cmf_quant;
唯一索引:UNIQUE 关键词
ALTER TABLE cmf_quant ADD UNIQUE INDEX uk_code_date (code, date);
one
import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import copy
import warnings
warnings.filterwarnings("ignore") # 关掉烦人的警告
# 数据库配置
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "db",
"charset": "utf8mb4"
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
def get_info(mcode):
sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
df = pd.read_sql(sql, conn, params=(mcode,))
return df
# ===================== ✅ 极速插入:INSERT IGNORE =====================
def df_to_datalist(df):
insert_list = []
for _, row in df.iterrows():
data = (
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
)
insert_list.append(data)
return insert_list
def insert_data(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
, `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
, `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def insert_datan(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`,`is_now`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def do_coden(exchange, code):
start = time.time()
dlx = gp.get_today(exchange, code)
# print(dlx)
if dlx:
row=dlx[0]
dx=[
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'],int(time.time())
]
# print(dx)
insert_count, skip_count = insert_datan([dx])
print(f"股票 {code} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")
def insert_all(info,code_str):
start=time.time()
if info:
dx=[]
for m in info:
row=m[0]
cv=copy.deepcopy(row)
cv['mcode']=row['code'].strip()[4:10]
cv['code']=row['code'].strip()[2:4]+"."+row['code'].strip()[4:10]
df_b=pd.DataFrame([cv])
df_a=get_info(cv['mcode'])
df=pd.concat([df_a,df_b],ignore_index=True)
df=gp.quant(df)
df = df.replace([float('inf'), -float('inf')], 0)
df = df.fillna(0)
data_list = df_to_datalist(df)
insert_count, skip_count = insert_data(data_list[-1:])
print(insert_count, skip_count)
dx.append([
row['code'].strip()[4:10], row['code'].strip()[2:4]+"."+row['code'].strip()[4:10], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'],int(time.time())
])
insert_count, skip_count = insert_datan(dx)
print(f"股票 {code_str} → 插入:{insert_count}条 | 跳过:{skip_count}条 | 耗时:{time.time()-start:.2f}s")
gp=GP(False)
# # info=gp.get_today("sh", "600000")
# # print(info)
# do_coden("sh", "600000")
df = pd.read_csv('all.csv', dtype={'code': str})
i=0
code_str=''
for index, row in df.iterrows():
c=time.time()
if i<25:
code_str=row['exchange']+row['code']+","+code_str
i=i+1
else:
print(code_str)
ccc=gp.gp_all(code_str)
i=0
code_str=''
insert_all(ccc,code_str)
x=4-(time.time()-c)
if x>0:
time.sleep(x)
ccc=gp.gp_all(code_str)
insert_all(ccc,code_str)
tquant
import pymysql
from pymysql.err import OperationalError, ProgrammingError
from gp import GP
import pandas as pd
import time
import json
from rules import rules
import warnings
warnings.filterwarnings("ignore") # 关掉烦人的警告
gp=GP(False)
# 数据库配置
config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456",
"database": "db",
"charset": "utf8mb4"
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
def df_to_datalist(df):
insert_list = []
for _, row in df.iterrows():
data = (
row['code'][3:], row['code'], row['date'], row['open'], row['high'], row['low'], row['close'], row['preclose'], row['volume'], row['pctChg'],
row['turn'], row['isST'], row['adjustflag'], row['ma5'], row['ma10'], row['ma20'], row['ma60'], row['vol_ma5'], row['vol_ma10'], row['vol_ma20'],
row['vol_ma60'], row['tr'], row['atr'], row['rs_6'], row['rsi_6'], row['rs_14'], row['rsi_14'], row['money_ma5'], row['status'], row['pp'],
row['kk'], row['ema12'], row['ema26'], row['dif'], row['dea'], row['macd'], row['rsv'], row['k'], row['d'], row['j']
)
insert_list.append(data)
return insert_list
def insert_data(data_list):
sql = """
INSERT IGNORE INTO `cmf_quantn`
( `mcode`, `code`, `date`, `open`, `high`, `low`, `close`, `preclose`, `volume`, `pctChg`
, `turn`, `isST`, `adjustflag`, `ma5`, `ma10`, `ma20`, `ma60`, `vol_ma5`, `vol_ma10`, `vol_ma20`
, `vol_ma60`, `tr`, `atr`, `rs_6`, `rsi_6`, `rs_14`, `rsi_14`, `money_ma5`, `status`, `pp`
, `kk`, `ema12`, `ema26`, `dif`, `dea`, `macd`, `rsv`, `k`, `d`, `j`
)
VALUES
(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
) """
# 直接批量插入,重复自动跳过!
cursor.executemany(sql, data_list)
conn.commit()
# affected rows 就是插入成功的数量
insert_count = cursor.rowcount
skip_count = len(data_list) - insert_count
return insert_count, skip_count
def get_info(mcode):
sql = "SELECT * FROM cmf_quant WHERE mcode = %s ORDER BY date ASC"
df = pd.read_sql(sql, conn, params=(mcode,))
return df
def get_info_t(mcode):
# 读取腾讯最后一行
sql = "SELECT * FROM cmf_quantn WHERE mcode = %s ORDER BY date DESC LIMIT 1"
df = pd.read_sql(sql, conn, params=(mcode,))
return df
def get_score(df):
if len(df)>90:
score=0
date=df.iloc[-1]['date']
score_text=[]
code=df.iloc[-1]['code']
for ru in rules:
if ru['check'](df):
score=score+ru['score']
score_text.append(ru['name'])
return [
df.iloc[-1]['id'],
code,
date,
score,
score_text,
]
return False
def update_score(info):
if info:
# 1. 创建连接
# conn = pymysql.connect(**config)
# cursor = conn.cursor()
try:
sql="UPDATE cmf_quantn SET score=%s,score_text=%s WHERE id=%s"
cursor.execute(sql, [info[3],json.dumps(info[4],ensure_ascii=False),info[0]])
conn.commit()
return True
except Exception as e:
conn.rollback() # 失败自动回滚
return False
# finally:
# cursor.close()
# conn.close()
return False
dfc = pd.read_csv('all.csv', dtype={'code': str})
for index, row in dfc.iterrows():
code=row['code']
c=time.time()
df=get_info(code)
df_t=get_info_t(code)
# 腾讯
if (not df.empty) and (not df_t.empty):
dfc=pd.concat([df,df_t],ignore_index=True)
num=len(dfc)
if num>90:
# #最新评分
dfc = gp.quant(dfc)
# print(dfc.iloc[-1])
# break
score_list=get_score(dfc)
print(score_list)
if score_list:
update_score(score_list)
data_list = df_to_datalist(df)
insert_count, skip_count = insert_data(data_list[-1:])
print(code,insert_count, skip_count)
break
# print(score_list[1],time.time()-c)
# #全量评分
# while len(df) >90:
# score_list=get_score(df)
# if score_list:
# update_score(score_list)
# df=df.iloc[:-1]
# print(time.time()-c)
cursor.close()
conn.close()