当前位置: 首页 > news >正文

Python 算法交易实验81 QTV200日常推进-重新实验SMA/EMA/RSI

说明

本次实验考虑两个点:

  • 1 按照上一篇谈到的业务目标进行反推,有针对性的寻找策略
  • 2 worker增加计算的指标,重新计算之前的实验

内容

工具方面,感觉rabbitmq还是太慢了。看了下,rabbitmq主要还是面向可靠和灵活路由的。目前我的需求虽然不是很需要速度,但是这吞吐太低了实在难受。特别是从结果队列里取数,一条一条的。

终于可以接着继续写了,这周还是很忙,不过成果不错。比较开心小伙伴帮我搞定了kafka,我这个java绝缘体真的是只要搞java就碰到坑。在kafka的过程中,对性能比较关注,实测下来发现json序列化的开销实在是很大。在消息的传输过程中,为了通用性,我对数据都进行了json传输,时间占比在整体80%以上。

与实验相关的是:这次打算开多个worker快速跑数,一个本能是想利用缓存–Redis。后来一试发现比clickhouse慢了一个量级,前后一想就对上了—序列化太费了。

如果数据是结构化的,尽量直接使用结构化数据库

另外正好最近在转向ORM方式,突然发现实验要保存的数据通过这种方式更加合适。

1 结果对象

先定义结果的数据模型,然后使用mysql进行全局存储。

以下按照之前的计划,生成了短、中、长期指标。

from Basefuncs import * from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Index
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetime
import shortuuid
def get_shortuuid(charset=None):"""生成一个简洁的唯一标识符(短 UUID)。参数:charset (str, optional): 自定义的字符集。如果未提供,将使用默认字符集。返回:str: 生成的短 UUID。"""if charset:su = shortuuid.ShortUUID(charset=charset)return su.uuid()else:return shortuuid.uuid()m7_24013_url = f"mysql+pymysql://USER:PASSWD@IP:PORT/mydb"# from urllib.parse import quote_plus
# the_passed = quote_plus('!@#*')
# # 创建数据库引擎
m7_engine = create_engine(m7_24013_url)# 创建基类
Base = declarative_base()# 定义数据类型
class SMAEMABacktest(Base):__tablename__  = 'smaema_backtest'id = Column(Integer, primary_key=True,autoincrement=True)short_uuid = Column(String(50), default = lambda: get_shortuuid())strategy_name = Column(String(50))long_t = Column(Integer)short_t = Column(Integer)deals = Column(Integer)# 交易时间median_trade_days = Column(Integer)# 单利润率total_mean_npr = Column(Float)last3_mean_npr = Column(Float)last1_mean_npr = Column(Float)# 盈亏比total_win_loss_ratio = Column(Float)last3_win_loss_ratio = Column(Float)last1_win_loss_ratio = Column(Float)# 胜率total_win_rate = Column(Float)last3_win_rate = Column(Float)last1_win_rate = Column(Float)  get_data_duration = Column(Float)process_data_duration = Column(Float)create_time = Column(DateTime, default=lambda: datetime.now())#def __init__(self,**kwargs):# self.short_uuid = get_shortuuid()# for key, value in kwargs.items():#     if key in ['create_time'] and isinstance(value, str):#         value = datetime.strptime(value, '%Y-%m-%d %H:%M:%S')#     setattr(self, key, value)# 创建表
Base.metadata.create_all(m7_engine)

开始读取数据, 70万条数据1.15秒从数据库取出(如果用redis我记得大约是8秒)

short_t = 100
long_t = 200# sma
sma_result = SMAEMABacktest(strategy_name ='sma', long_t = long_t, short_t = short_t)
# ema 
ema_result = SMAEMABacktest(strategy_name ='ema', long_t = long_t, short_t = short_t)ch_cfg = CHCfg(database=clickhouse_db, host = clickhouse_ip )
chc =  CHClient(**ch_cfg.dict())query_sql = f'select {clickhouse_select_cols_str} from {clickhouse_table_name}'
tick1 = time.time()
data = chc._exe_sql(query_sql)
tick2 = time.time()
print('getting data %.2f' % (tick2 - tick1 ))
getting data 1.15

主程序如下: 当worker开始执行时去队列取数,然后把结果写到数据库里。

大体上是这样的效果:
在这里插入图片描述

2 处理逻辑

def process(input_smaema = None, Session = Session):gfgo_lite_server = input_smaema.gfgo_lite_servershort_t = input_smaema.short_tlong_t = input_smaema.long_tclickhouse_db = input_smaema.clickhouse_dbclickhouse_ip = input_smaema.clickhouse_ipclickhouse_table_name = input_smaema.clickhouse_table_nameclickhouse_select_cols = input_smaema.clickhouse_select_colsclickhouse_select_cols_str = input_smaema.clickhouse_select_cols_strucs = UCS(gfgo_lite_server =gfgo_lite_server)# Base对象不是pydantic允许属性为空# smasma_result = SMAEMABacktest(strategy_name ='sma', long_t = long_t, short_t = short_t)# ema ema_result = SMAEMABacktest(strategy_name ='ema', long_t = long_t, short_t = short_t)ch_cfg = CHCfg(database=clickhouse_db, host = clickhouse_ip )chc =  CHClient(**ch_cfg.dict())query_sql = f'select {clickhouse_select_cols_str} from {clickhouse_table_name}'tick1 = time.time()data = chc._exe_sql(query_sql)tick2 = time.time()print('getting data %.2f' % (tick2 - tick1 ))t1 = short_tt2 = long_tdf = pd.DataFrame(data, columns = ['shard','part','block','brick','code','amt','close','high','low','open', 'data_dt', 'pid', 'ts','vol'])df1 = df.sort_values(['data_dt'])'''bfill 是 "backward fill" 的缩写,是一种处理缺失值的方法。在数据处理中,当数据序列中存在缺失值(NaN,即 "Not a Number")时,bfill 方法会用该缺失值后面的有效值来填充这个缺失值。具体来说,bfill 方法从序列的末尾开始向前查找,找到第一个非缺失值,并用这个值来填充前面的缺失值。这个过程会一直进行,直到所有的缺失值都被填充完毕或者到达序列的开头。'''df1['sma_t1'] = df1['close'].rolling(window=t1).mean().bfill()df1['sma_t2'] = df1['close'].rolling(window=t2).mean().bfill()# 计算指数移动平均线df1['ema_t1'] = df1['close'].ewm(span=t1, adjust=False).mean()df1['ema_t2'] = df1['close'].ewm(span=t2, adjust=False).mean()# 回测bt_df = df1 import tqdm# -----------------------  smaopen_orders = []close_orders = []for i in tqdm.tqdm(range(len(bt_df))):tem_dict = dict(bt_df.iloc[i])data_dt = tem_dict['data_dt']close = tem_dict['close']sma_t1 = tem_dict['sma_t1']sma_t2 = tem_dict['sma_t2']if len(open_orders) == 0:if sma_t1 >  sma_t2:# 金叉order_dict =  {}order_dict['but_dt'] = data_dtorder_dict['buy_price'] = closeopen_orders.append(order_dict)continueif len(open_orders):if  sma_t1< sma_t2: # 死叉for  item in reversed(open_orders):order_dict = open_orders.pop()order_dict['sell_dt'] = data_dtorder_dict['sell_price'] = closeorder_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']close_orders.append(order_dict)tick3 = time.time()close_df = pd.DataFrame(close_orders)sma_result = cal_kpis(close_df = close_df, some_obj = sma_result)sma_result.get_data_duration = round(tick2 -tick1 , 3)sma_result.process_data_duration = round(tick3 -tick2 , 3)with Session() as session:session.add(sma_result)session.commit()# ========================= ema tick4 = time.time()open_orders = []close_orders = []for i in tqdm.tqdm(range(len(bt_df))):tem_dict = dict(bt_df.iloc[i])data_dt = tem_dict['data_dt']close = tem_dict['close']ema_t1 = tem_dict['ema_t1']ema_t2 = tem_dict['ema_t2']if len(open_orders) == 0:if ema_t1 >  ema_t2:# 金叉order_dict =  {}order_dict['but_dt'] = data_dtorder_dict['buy_price'] = closeopen_orders.append(order_dict)continueif len(open_orders):if  ema_t1< ema_t2: # 死叉for  item in reversed(open_orders):order_dict = open_orders.pop()order_dict['sell_dt'] = data_dtorder_dict['sell_price'] = closeorder_dict['gp'] = order_dict['sell_price'] - order_dict['buy_price']close_orders.append(order_dict)tick5 = time.time()close_df = pd.DataFrame(close_orders)close_df = pd.DataFrame(close_orders)ema_result = cal_kpis(close_df = close_df, some_obj = ema_result)ema_result.get_data_duration = round(tick2 -tick1 , 3)ema_result.process_data_duration = round(tick5 -tick4 , 3)with Session() as session:session.add(ema_result)session.commit()print('done')if __name__ =='__main__':rm = RabbitManager()# 获取1个数据data_list = rm.get_message_early_ack('smaema', count=1)if len(data_list):the_task = data_list[0]short_t = the_task['short_t']long_t = the_task['long_t']input_smaema = InputSMAEMA_Para(short_t = short_t ,long_t = long_t)process(input_smaema)

3 调度

最近也在研究一些简单又可靠的调度方法。

  • 1 cron: 简单是简单了,就是有很多缺陷。首先是只能到分钟级,另外没有任务的控制,反正到点就触发。这样如果是长耗时任务,很可能发太多撑爆内存。
  • 2 nohup + for(while) : 简单,但是不好控制worker的数量。
  • 3 nohup + apscheduler : 这个可以有。最初是打算docker + aps的,但那样要么都执行docker worker,要么就要管理挂载,比较麻烦。未来模式成熟了,可以考虑同时保留 systemd + aps 和docker + aps 两种简单的调度模式并存。
from datetime import datetime
import os 
from apscheduler.schedulers.blocking import BlockingSchedulerdef exe_sh(cmd = None):os.system(cmd)# 后台启动命令 nohup python3 aps.py >/dev/null 2>&1 &if __name__ == '__main__':sche1 = BlockingScheduler()# sche1.add_job(exe_sh,'interval', seconds=1, kwargs ={'cmd':'python3 ./main_handler/main.py'})sche1.add_job(exe_sh,'interval', seconds=10, kwargs ={'cmd':'python3 qtv200_0004_sma_ema_clickhouse.py'},max_instances=20,coalesce=True)print('[S] starting inteverl')sche1.start()

我启动了周期任务,10秒一次,最多允许20个任务实例,且超时任务最后会合并(为1次)。看到这种方法是满核调用的。
在这里插入图片描述

在这里插入图片描述
有意思的是,我问了下deepseek,如果没有骗我的话,aps默认是线程池。
在这里插入图片描述
一般来说,python是单进程~单线程的。如果在一个脚本内,一般来说不会突破一个核的限制。现在在调度模式下一个线程实际上触发了多个线程的工作,某种程度上说就绕过了这个限制。

4 结果

还挺理想的,8个核 ~ 20 worker,在睡一觉之后,大约完成了 10%的任务。我打算在另一个位置也启动一下worker。另外注意到租用机的数据读取时间是2秒,比我的主机慢了一倍。
在这里插入图片描述

总体上,本次实验应该算是成功了:

指标计算的提升:

  • 1 增加了盈亏比
  • 2 增加了胜率

架构和工具方面的提升:

  • 1 使用clickhouse,增强了worker处理速度
  • 2 使用ORM,灵活存储
  • 3 使用APS调度

下一步:

  • 1 对结果进行数据分析,例如聚类,以发现规律
  • 2 准备展开更多的标的的计算,暂时先考虑SMA/EMA策略
  • 3 将选出的策略与强化学习框架对接
  • 4 准备前端投射,实盘操作

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 记录|Label组件如何控制下边框为直线
  • 初识Linux系统,Ubuntu基础使用
  • 做题常用操作
  • PythonStudio 控件使用常用方式(二十七)TActionList
  • 探索Prompt调优的艺术:让AI更智能
  • 浅谈Sql Server 临时表
  • 执行docker compose命令出现 Additional property include is not allowed
  • 楼宇控制与智能家居实训室解决方案
  • linux系统编程:多任务编程(进程1)
  • 【Python快速入门和实践019】Python常用脚本-查看视频信息并压缩视频
  • C语言家教记录(六)
  • 【Qt】QWidget的toolTip属性
  • 函数递归那些事
  • MySQL基础练习题47-判断三角形
  • haproxy实验-2
  • “寒冬”下的金三银四跳槽季来了,帮你客观分析一下局面
  • 【Leetcode】104. 二叉树的最大深度
  • 002-读书笔记-JavaScript高级程序设计 在HTML中使用JavaScript
  • android 一些 utils
  • co模块的前端实现
  • Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  • Docker: 容器互访的三种方式
  • Druid 在有赞的实践
  • github从入门到放弃(1)
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • Octave 入门
  • Swift 中的尾递归和蹦床
  • 爱情 北京女病人
  • 分布式任务队列Celery
  • 简单实现一个textarea自适应高度
  • 跨域
  • 离散点最小(凸)包围边界查找
  • 数组的操作
  • 一些关于Rust在2019年的思考
  • Mac 上flink的安装与启动
  • 阿里云移动端播放器高级功能介绍
  • 回归生活:清理微信公众号
  • ​力扣解法汇总946-验证栈序列
  • ​什么是bug?bug的源头在哪里?
  • # 数仓建模:如何构建主题宽表模型?
  • ## 1.3.Git命令
  • #QT(TCP网络编程-服务端)
  • #WEB前端(HTML属性)
  • #window11设置系统变量#
  • #每日一题合集#牛客JZ23-JZ33
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • $分析了六十多年间100万字的政府工作报告,我看到了这样的变迁
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (八)Flask之app.route装饰器函数的参数
  • (翻译)Quartz官方教程——第一课:Quartz入门
  • (分布式缓存)Redis分片集群
  • (附源码)ssm考生评分系统 毕业设计 071114
  • (计算机网络)物理层
  • (一)、python程序--模拟电脑鼠走迷宫
  • (转) Face-Resources