fastapi定时任务,增量构建可转债交易数据入mongo和qlib
百天计划之第35篇,关于”AI量化投资研究平台“搭建。
今天继续可转债日频交易数据的构建。
01 取bond_daily表里的最新日期
交易数据是需要每天更新的,一个简单的办法是查询上次更新的日期,然后从这一天开始更新。
这种设计有一个好处是,可以把其中某一个标的数据清空后,我们的程序会从头开始构建数据。
这里的逻辑比较简单,特定的daily表中,倒序查trade_date字段,如果没有查到记录,那么取”19900101“。
def get_daily_last_date(code, tb_name, code_field='ts_code', date_field='trade_date'): # 查询最新date items = get_db()[tb_name].find({code_field: code}, {'_id': 0, date_field: 1}).sort(key_or_list=date_field, direction=pymongo.DESCENDING).limit( 1) items = list(items) if items and len(items) > 0: start_date = items[0][date_field] else: start_date = '19900101' return start_date
02 把日频数据更新加到定时器
我们定时更新的频率是24小时,在收盘之后更新即可。
昨天贴过”异步定时器“的代码,其实比较简单,就是添加一个持续运行的异步任务,sleep(seconds),如果当前函数在执行,就不重复执行,否则到点会持续触发。
seconds = 24 * 60 * 60 # 24小时*60分钟*60秒 @app.on_event('startup') @repeat_task(seconds=seconds, wait_first=False) def repeat_task_update_bond_daily() -> None: logger.info('触发重复任务: 更新转债daily数据。====================================================') update_all_bond_daily()
03 把数据构建到qlib数据库
y也许有人会问了,daily数据都进入到mongo数据库了,为何还需要qlib数据库。
这里有一个场景:
查询2020年9月3号,MACD指标大于0的转债列表。
这个查询若是基于mongo,需要把所有转债数据读出,计算所有转债的MACD指标,然后筛选MACD>0,可以做到,但性能不高,每天查询都需要大量的计算。
qlib从本地文件加载数据,多线程并发同时有cache机制。
由qib数据库来支撑这种临时性的查询,性能会好很多。
mongo到qlib分两步走:
初次构建:
1、把数据按code下载到本地code.csv。
2、使用qlib的dump工具构建。
更新:
与mongo建库不同在于,如果每天去qlib里查最近的日期比较麻烦,我们就是按日期统一查询增量数据。
1、一次把增量数据读出,groupby(code)后存成csv。
2、使用qlib的update dump工具更新。
路径规划,使用pathlib很方便:
from pathlib import Path DATA_DIR = Path(__file__).parent.parent.joinpath("data") DATA_DIR_QLIB = DATA_DIR.joinpath('qlib_data') DATA_DIR_QLIB_BOND = DATA_DIR_QLIB.joinpath('bond') DATA_DIR_CSV = DATA_DIR.joinpath('csv') DATA_DIR_CSV_BOND = DATA_DIR_CSV.joinpath('bond') dirs = [DATA_DIR, DATA_DIR_QLIB, DATA_DIR_QLIB_BOND, DATA_DIR_CSV, DATA_DIR_CSV_BOND] for dir in dirs: dir.mkdir(exist_ok=True, parents=True)
可以在daily更新之后,再统一update一次qlib的增量数据。
def update_all(start_date): def to_csv(x): code = list(x['ts_code'])[0] print(code, x) x.to_csv('{}/{}.csv'.format(DATA_DIR_CSV_BOND_UPDATE, code), index=False) items = list(get_db()['bond_daily'].find({'trade_date': {'$gte': start_date}}, {'open': 1, 'high': 1, 'low': 1, 'close': 1, 'vol': 1, 'trade_date': 1, 'amount': 1, 'ts_code': 1, '_id': 0})) df = pd.DataFrame(items) df.rename(columns={'trade_date': 'date', 'vol': 'volume'}, inplace=True) print(df) df.groupby('ts_code').apply(lambda x: to_csv(x)) dump = DumpDataUpdate(csv_path=DATA_DIR_CSV_BOND_UPDATE, qlib_dir=DATA_DIR_QLIB_BOND) dump.dump()
04 额外数据
转债的入门级且有效的策略是”双低轮动“策略。
双低转债具体是指:价格低、溢价率低。
策略背后的逻辑:
价格低,转债的”债性“强,正股下跌的时候抗跌;
转股溢价率低,转债的”股性“强,正股上涨的时候能跟得上。
双低值=可转债价格+转股溢价率*100。按这个又低值从低到高即可。
需要做一些排除:
一年内到期,因为这相当是一个短期债,价值有限。
已经公告强赎,强赎就是人家已经要强制回收或者强制转股了,没有投资的价值。
转债的价格已经有了。
转股溢价率=可转债面值÷转股值-1。
正股价,正股价还有复权因子,还需要转股价,转股价不是季频,也不是月频,一支转债生命周期里也就”下修“那么两三次。