当前位置: 首页 > news >正文

fastapi+mongo+qlib:体系化构建AI量化投研平台

百天计划之第34篇,关于“AI量化投资研究平台”建设。

从今天开始要开始一条主线——就是开始搭建整个投研平台

如果说8月开始是知识点的梳理,一些基础技术的准备(以qlib和机器学习为核心),9月开始重点是“以解决真正的投资决策问题”为目标

切入点是可转债

当下A股市场持续下行,在市场总体下行的时候,讲真的策略不是最重要的。

对于新手而言,可转债和优秀指数基金,永远是最好的标的,会让你感受到,投资其实未必有那么大的风险(提示:投资有风险)。

01 架构

如下是一个极简的AI投研架构,极简意味着非核心部件都采用最简单的组件。

定时任务没有使用dagster或者airflow,因为这意味着需要引入一套全新的技术栈,需要人力云独立运维,我们使用fastapi内置的异步task。

持久化的中间件选mongo为主,并兼容qlib数据库,便于查询与并行计算。

后端使用fastapi——由于要深度使用mongo,则弃用django,而在flask与fastapi之间,考虑到我们使用api的场景偏多,考虑对api更加友好的fastapi框架,性能也会更好。

02 价值点

重点之核心在于“为谁解决什么问题”。

“半个成品远胜于一个半成品”。

目标远大很好,但饭要一口口吃,月亮虽好,六便士也很重要。

投研平台价值点就是辅助用户投资决策效率,提升胜率

面对可转债400多支,如何快速纵览数据历史,有什么规律,方便我们去分析。内置量化好的指标,对全市场进行透视,挖掘,聚类,排序。在此基础上定制一些策略,回测并归因等等。

总之,面对实用编程。

03 后端基础框架fastapi

fastapi是一个现代,异步python框架,官网如下:

https://fastapi.tiangolo.com/zh/

安装非常轻量:

pip install fastapi

pip install "uvicorn[standard]" ——安装运行的服务器。

hello FastApi:

from typing import Union
from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():
    return {"Hello": "基于Fastapi的AI量化投研平台!"}

是不是非常简单,大家可能会想起Flask,是的。

Fastapi还内置精美的swagger UI,

让我们调试接口非常方便,这也是我优选Fastapi框架的原因之一。

04 基于FastApi的定时任务

传统的定时任务一般是apscheduler,它功能强大,但说实话,并不好用。

我们的定时任务其实比较简单,就是定时去做一些事情,不需要特别精确,一般就是收盘后要去更新数据,然后做一些指标的预计算,或者数据同步之类的操作。

fastapi本身是异步框架,内置了定时任务的功能。

import asyncio
from loguru import logger
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]


def repeat_task(
    *,
    seconds: float,
    wait_first: bool = False,
    raise_exceptions: bool = False,
    max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
    '''
    返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
    其装饰的函数不能接受任何参数并且不返回任何内容.
    参数:
        seconds: float
            等待重复执行的秒数        wait_first: bool (默认 False)
            如果为 True, 该函数将在第一次调用前先等待一个周期.
        raise_exceptions: bool (默认 False)
            如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
        max_repetitions: Optional[int] (默认 None)
            该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
    '''
    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        '''
        将修饰函数转换为自身重复且定期调用的版本.
        '''
        is_coroutine = asyncio.iscoroutinefunction(func)
        had_run = False
        @wraps(func)
        async def wrapped() -> None:
            nonlocal had_run
            if had_run:
                return
            had_run = True
            repetitions = 0
            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            # 以协程方式执行
                            await func()  # type: ignore
                        else:
                            # 以线程方式执行
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)
            ensure_future(loop())
        return wrapped
    return decorator

我们基于异步功能来实现一个装饰器repeat_task。

一次封装,再使用就无比简洁了:

@app.on_event('startup')
@repeat_task(seconds=6, wait_first=True)
def repeat_task_aggregate_request_records() -> None:
    logger.info('触发重复任务: 聚合请求记录')
 

05 可转债数据

我们需要规划一下数据库表名字:

债券:bond_, 股票:stock_, 基金:fund_,不带复数s。

基础信息:_basic, 日线:_daily,财务指标 _fin

一般而言,任何一个交易品种,我们都需要先入库两种数据:

一是基础信息列表,二是交易日频数据。

基本信息只需要构建一次,若有变动不定期手动刷新即可:

def build_cb_basic():
    df = get_cb_basic()
    df['_id'] = df['ts_code']
    write_df('bond_basic', df, drop_tb_if_exist=True)

可以看出,一共A股历史上发行了799支可转债,目前可交易的400多支。

交易数据,这是需要每天收盘后去增量更新的,我们的逻辑是如果没有数据,从19900101开始读,如果有数据,则从当前日期增量更新即可。

mongo的优点可以自动忽略_id相同的行,所以特别方便。

def update_all_bond_daily():
    # 获取所有列表,有日期,从最近的日期开始读。
    items = list(get_db()['bond_basic'].find({}, {'ts_code': 1, '_id': 0}))
    if items and len(items) == 0:
        logger.error("读可转债列表为空")
        return
    for i, item in enumerate(items):
        code = item['ts_code']
        logger.debug("{}-{}-{}".format(i, code, i / len(items)))
        date = get_daily_last_date(code)
        df = get_bond_daily(code, date)
        df['_id'] = df['ts_code'] + '_' + df['trade_date']
        print(df.tail())
        write_df('bond_daily', df)
        break

小结一下:

我们开始体系化构建整体平台。

后端框架是fastapi,从fastapi内置的定时任务开始,构建可转债的基本数据和日频数据。

qlib因子分析之alphalens源码解读

基于alphalens对qlib的alpha158做单因子分析

人生B计划,不确定时代的应对之道

飞狐,科技公司CTO,用AI技术做量化投资;以投资视角观历史,解时事;专注个人成长与财富自由。

相关文章:

  • ClickHouse学习笔记之表引擎
  • 智能家居生态:华为、小米各异
  • arx 读入块表
  • 避孕套、安全套分类|常识|图文指南(杜蕾斯、冈本、杰士邦)
  • postgres源码解析 SysLogger辅助进程
  • MySql ocp认证之数据导入导出(五)
  • 论文阅读(10) 基于吸力的推进是动物高效游泳的基础(2015)
  • 《大数据之路:阿里巴巴大数据实践》-第2篇 数据模型篇 -第10章 维度设计
  • 【java】基础内容(4)
  • Perfetto分析进阶
  • Framework入门のPiex 6P源码(下载/编译/刷机)
  • 高端手机市场的诸神之战,vivo举起一把“雷神之锤”
  • 简单的ajax任务:get和post方式提交前端用户输入信息给服务器
  • Qt5开发从入门到精通——第四篇(调色板)
  • MySQL数据库如何线上修改表结构
  • Apache Zeppelin在Apache Trafodion上的可视化
  • Create React App 使用
  • es6
  • ESLint简单操作
  • HTTP 简介
  • iOS 颜色设置看我就够了
  • Javascript基础之Array数组API
  • Java深入 - 深入理解Java集合
  • Java小白进阶笔记(3)-初级面向对象
  • jquery cookie
  • JSDuck 与 AngularJS 融合技巧
  • Python打包系统简单入门
  • Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel
  • sublime配置文件
  • Yeoman_Bower_Grunt
  • 从重复到重用
  • 技术:超级实用的电脑小技巧
  • 每天一个设计模式之命令模式
  • 前端相关框架总和
  • 入门到放弃node系列之Hello Word篇
  • 深度学习入门:10门免费线上课程推荐
  • 延迟脚本的方式
  • TPG领衔财团投资轻奢珠宝品牌APM Monaco
  • ​MySQL主从复制一致性检测
  • ​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​
  • ​一帧图像的Android之旅 :应用的首个绘制请求
  • #我与Java虚拟机的故事#连载09:面试大厂逃不过的JVM
  • (附源码)ssm本科教学合格评估管理系统 毕业设计 180916
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (四)linux文件内容查看
  • (转)创业的注意事项
  • ./configure、make、make install 命令
  • .NET 8.0 中有哪些新的变化?
  • .Net CoreRabbitMQ消息存储可靠机制
  • .Net 高效开发之不可错过的实用工具
  • .NET 中的轻量级线程安全
  • .Net+SQL Server企业应用性能优化笔记4——精确查找瓶颈
  • .NET关于 跳过SSL中遇到的问题
  • @拔赤:Web前端开发十日谈
  • [ C++ ] 继承