当前位置: 首页 > news >正文

第二届Apache Flink极客挑战赛冠军比赛攻略_SkyPeaceLL队

关联比赛:  第二届 Apache Flink极客挑战赛

赛题介绍

  • 指定的数据集
    一、病例行动数据集
  1. 病例历史行动数据集(训练集1) 1M+
  2. 确诊病例数据 (测试集1) 500+
  3. 实时病例行动数据集(测试集2) 1000+
    二、天猫精灵行为数据集
  1. 天猫精灵历史行为数据集(训练集2) 1M+
  2. 用户行为数据集(测试集3) 500+
  3. 实时用户行为数据集(测试集4) 1000+
  • 四个任务
  1. 根据测试集1每条数据的特征向量,在训练集1中找出该病例(人)对应的所有记录。
  2. 对测试集2的每条数据,根据其特征向量进行实时分类(人)。
  3. 根据测试集3每条数据的特征向量,在训练集2中找出该用户行为(领域+意图)对应的所有记录。
  4. 对测试集4的每条数据,根据其特征向量进行实时分类(领域+意图)。
  • 性能要求
  1. Job总运行时间不能超过3小时。
  2. 对每条实时数据完成实时分类的响应时间不能超过500ms。
  • 平台和组件
    Flink,PyFlink,Flink ai_flow,达摩院 Proxima,Intel Zoo cluster serving

解决方案 - Workflow

enter image description here

解决方案 - Workflow config file

本方案中,代码框架设定了一个目标:对于新增的相似应用的数据集,不修改python代码,只需新增一个workflow config file(yaml文件),根据新数据集的基本属性以及数据结构的特性设置相应的配置即可。
/code/package/wf_config_1.yaml --病例行动数据集对应的配置文件

WORKFLOW_NO: 1
KAFKA_SOLUTION_NO: 2    #1: Use TABLE API for kafka. 2: Use Kafka API
CLUSTER_SERVING_PARALLELISM: 16
PREDICT_WITH_TRAIN_PARALLELISM: 50
INPUT_DIM: 512
ENCODING_DIM: 128
LEARNING_RATE: 0.001
EPOCHS: 1
#MODEL_TYPE: SIMPLE_AUTOENCODER, DEEP_AUTOENCODER
MODEL_TYPE: SIMPLE_AUTOENCODER
MODEL_NAME: camera_auto_encoder
data_set_dir: /tcdata/
output_dir: /opt/result/
train_data_file: train_data.csv
feature_column_no: 3
train_predict_result_dir: camera_predict_result
…
…
create_training_table_sql: create table training_table(uuid varchar,face_id varchar,device_id varchar,feature_data varchar) with ('connector.type' = 'filesystem','format.type' = 'csv','connector.path' = '{}','format.ignore-first-line' = 'false','format.field-delimiter' = ';')
create_merge_table_sql: create table merge_table(uuid varchar,
…

/code/package/wf_config_2.yaml --天猫精灵行为数据集对应的配置文件

WORKFLOW_NO: 2
KAFKA_SOLUTION_NO: 2    #1: Use TABLE API for kafka. 2: Use Kafka API
CLUSTER_SERVING_PARALLELISM: 16
PREDICT_WITH_TRAIN_PARALLELISM: 64
INPUT_DIM: 700
ENCODING_DIM: 128
LEARNING_RATE: 0.001
EPOCHS: 1
#MODEL_TYPE: SIMPLE_AUTOENCODER, DEEP_AUTOENCODER
MODEL_TYPE: SIMPLE_AUTOENCODER
MODEL_NAME: genie_auto_encoder
data_set_dir: /tcdata/
output_dir: /opt/result/
train_data_file: genie_data.csv
feature_column_no: 2
train_predict_result_dir: genie_predict_result
…
…
create_training_table_sql: create table training_table(uuid varchar,action_id varchar,feature_data varchar) with ('connector.type' = 'filesystem','format.type' = 'csv','connector.path' = '{}','format.ignore-first-line' = 'false','format.field-delimiter' = ';')
create_merge_table_sql: create table merge_table(uuid varchar,
…

解决方案 - Data pre-processing

  • 病例行动数据集
    不含异常数据,且特征向量已经L2 Normalization,不需要特别的预处理。
  • 天猫精灵行为数据集
    存在一些异常数据,需要做以下预处理:
  1. 移除某些特征向量数据末尾多出的空格(注:如果不做相应处理,score3通常得0分)
  2. Re-generate UUID for duplicated UUID
  3. Processing zero vector
  4. Processing duplicated vector
  5. L2 Normalization

解决方案 - Model training

  • Model
  1. Simple AutoEncoder (实测效果好,稳定,性能好,采用)
  2. Deep AutoEncoder(实测效果好,性能一般,最终未采用)
  3. VAE (Variational AutoEncoder) (实测效果相对较差,未采用)
  4. PCA (Principal Component Analysis) (实测效果相对较差,未采用)
  5. NMF (Non-negative matrix factorization) (实测效果相对较差,未采用)
    模型关键参数
  1. 损失函数:MSE
  2. 激活函数:linear
    降维的维度选择
  1. 病例行动数据集:512=>128
  2. 天猫精灵行为数据集:700=>128

解决方案 - Inference

  • Intel Zoo Cluster Serving
  1. 支持Tensorflow Saved Model 以及PyTorch Model for Inference
  2. 支持并发Inference(本赛题设置为16个并发),在多并发下运行稳定
  3. 模型针对CPU做了优化,无需GPU环境
  4. 自动生成配置,方便部署
  5. 响应时间短。平均每个请求响应时间实测小于35ms,充分满足本方案中的性能需求。

向量索引和向量检索

  • 阿里达摩院proxima
  1. 使用Proxima HnswBuilder 创建索引,使用HnswSearch search vector
  2. 支持海量数据向量检索
  3. 召回率高,Top100 召回率超过98.5%
  4. 检索性能高,在本赛题中,平均每个请求(TopK=1024)的响应时间小于3ms,完全满足TopK筛选+再聚类这样类型的应用需求,对于实时的向量检索也毫无压力。

解决方案 - 聚类算法

  • 针对历史行动数据的聚类
  1. 根据指定要search的vector,Topk=1024,通过Proxima search 出1024个UUIDs只是作为初步筛选的UUIDs。再使用Pandas查出它们对应的vectors,然后将这些vectors和指定search的那个vector合并为1025个vectors。
  2. 使用聚类算法Chinese_Whispers,对上述1025个vectors进行聚类分组后,取出和指定vector属于相同组的所有vectors所对应的UUIDs输出。Chinese_Whispers算法对于病例行动数据集效果最好。
  3. 另外尝试了K-Means,发现基本不可行。尝试了DBSCAN聚类算法,可行,但效果不如Chinese Whispers。
  4. 针对天猫精灵的数据集,还尝试了一种简单算法:Topk=128,从search出的128个UUIDs中根据result.score(),设置一个阈值,将result.score()<阈值的UUIDs全部输出。该实际评分效果要略微优于Chinese_Whispers。
  5. 其它一些常见的聚类算法以及较新的GCN(Graph Convolutional Network),因时间关系,计划赛后继续尝试。
  • 针对实时行动数据的实时分类
  1. 根据指定的vector,search Top1 UUID。
  2. 直接使用Top1 UUID 作为分类label输出。(注:用此分类方法+合适的模型+online data处理不超时,score2可得满分500分)

聚类算法 - Chinese_Whispers

  • 算法流程
    1 初始化:将所有的样本点初始化为不同的类。
    2 建图:构建无向图,以每个节点为一个类别,不同节点之间计算相似度,当相似度超过threshold,将两个节点相连形成关联边,边的权重为相似度。
    3 迭代:
    3.1 随机选取一个节点i开始,在其相连节点中选取边权重最大者j,并将i归为节点j类(若相连节点中有多个节点属于同一类,则将这些权重相加再做比较)
    3.2 遍历所有节点后,重复迭代至满足迭代次数。

    enter image description here

解决方案 – 解决Online Data (Kafka) 超时的问题

  • 方法一、使用ai_flow 内建的算子
    1 使用ai_flow.read_example、ai_flow.predict、ai_flow.transform和ai_flow.write_example
    2 在其中的SourceExecutor/SinkExecutor实现类中使用PyFlink TABLE API(For Kafka) 读/写Kafka Topic
    3 为相应Flink job的StreamExecutionEnvironment设置参数:stream_env.enable_checkpointing(250)
    该参数默认为3000ms,3000ms会导致每3秒才集中从Kafka Topic中读出6条数据。所以,如果不设置这个参数,必定会导致每6条数据中平均有5条会超时500ms,使得实时数据(score2和score4)得分很难超过100分(满分500分),因此必须改变这个参数设置。针对本赛题,可以设置为250ms。
    方法一在产线上应用没什么问题,但是在本比赛中它有一个小问题,那就是初始会有8秒延迟,这个延迟会使得赛题程序开始发送的约16条数据被TABLE API(For Kafka)读到时都会超时500ms,从而对最终评分有所影响(实测大概影响6分左右)。
    使用方法一可确保只会有少量的初始数据(实测16条左右)产生超时。
  • 方法二、使用ai_flow的用户自定义算子
    1 ai_flow支持更为灵活的用户自定义算子af.user_define_operation
    2 在用户自定义算子的Executor实现类中,直接使用Kafka Consumer/Producer 读写Kafka Topic
    3 直接通过 Kafka consumer从Kafka Topic读取数据,然后call Inference API (by Zoo cluster serving) 降维,然后使用Proxima search API search Top1 UUID,然后得出分类label,最后直接通过Kafka Producer 将结果数据写入Kafka Topic。
    使用方法二可避免初始16条数据的超时问题,设置好关键参数 (如fetch_max_wait_ms=200),可确保所有数据都不会超时。

总结和感想

本次比赛是算法+工程化问题。既要设计好算法,又要考虑实际工程需要。
模型算法:如果数据集的特征向量已经经过很好的处理,那么降维模型的模型可选择MSE loss损失小且Inference性能高的模型即可。而聚类算法要根据数据集特征向量的特性选择合适的算法。
向量检索:对于海量数据,必须使用专门的向量检索组件。阿里proxima提供了高召回率且极短的响应时间。
并行Inference:如果服务器只有CPU环境,那么使用Intel Zoo Cluster Serving 为Inference提供并行服务目前是非常好的选择。
实时数据处理:充分利用Zoo Cluster Serving、Proxima 的性能优势以及ai_flow user defined operation 的灵活优势,将实时数据处理效果最佳化。
代码框架:针对相似应用的新数据集,只需给出相应的新配置文件即可,无需改动python code。
工程考虑:代码不仅考虑比赛得分效果,同时也考虑了通过配置的方式在不同应用场景下使用不同的实现。
生产价值:一些基于向量检索的应用具有相似性,在这样的思路和不断改进下,它们应该可以泛化成通用的应用架构和代码框架,最终或许也可以实现为某一类软件产品或平台。另外,Zoo Cluster Serving 以及Proxima 在无昂贵GPU仅有CPU的环境下,提供了高并发及高性能的特性。因此,充分使用了Zoo Cluster Serving 以及Proxima的解决方案在一些实际生产系统的最终方案选择中,将具备很强的竞争力。

查看更多内容,欢迎访问天池技术圈官方地址:第二届Apache Flink极客挑战赛冠军比赛攻略_SkyPeaceLL队_天池技术圈-阿里云天池

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • VirtualBox 7.1.0 发布下载 - 开源跨平台虚拟化软件
  • 大数据和代理:揭示它们之间的微妙联系
  • STM32 + W5500 实现HTTPS !
  • 从Profinet到Ethernet IP网关技术重塑工业网络,数据传输更流畅
  • Vue3 父组件向子组件传值:异步数据处理的显示问题
  • MiniDB 使用手册
  • LIN总线CAPL函数——校验和段(Checksum)测试(linGetChecksum)
  • QT事件用法详解
  • 【网络安全 | 代码审计】JFinal之DenyAccessJsp绕过
  • GBase8c主备版500升级步骤
  • 基于R语言的统计分析基础:使用键盘输入数据
  • 重新认识一下JNIEnv
  • RFID技术实现消防物资消防车无感化智能管理设计方案
  • ECMAScript与JavaScript的区别
  • 【leetcode】树形结构习题
  • JS 中的深拷贝与浅拷贝
  • 【Leetcode】104. 二叉树的最大深度
  • 002-读书笔记-JavaScript高级程序设计 在HTML中使用JavaScript
  • Apache Zeppelin在Apache Trafodion上的可视化
  • Bytom交易说明(账户管理模式)
  • Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  • IndexedDB
  • JavaScript/HTML5图表开发工具JavaScript Charts v3.19.6发布【附下载】
  • JavaScript函数式编程(一)
  • Java小白进阶笔记(3)-初级面向对象
  • js递归,无限分级树形折叠菜单
  • LeetCode算法系列_0891_子序列宽度之和
  • Magento 1.x 中文订单打印乱码
  • Mysql优化
  • Storybook 5.0正式发布:有史以来变化最大的版本\n
  • use Google search engine
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 设计模式走一遍---观察者模式
  • 微信如何实现自动跳转到用其他浏览器打开指定页面下载APP
  • 想写好前端,先练好内功
  • 新手搭建网站的主要流程
  • 一些css基础学习笔记
  • ​LeetCode解法汇总2182. 构造限制重复的字符串
  • ​油烟净化器电源安全,保障健康餐饮生活
  • # 数仓建模:如何构建主题宽表模型?
  • #{} 和 ${}区别
  • (四)c52学习之旅-流水LED灯
  • (贪心) LeetCode 45. 跳跃游戏 II
  • (转载)微软数据挖掘算法:Microsoft 时序算法(5)
  • *2 echo、printf、mkdir命令的应用
  • *上位机的定义
  • . NET自动找可写目录
  • .net core 3.0 linux,.NET Core 3.0 的新增功能
  • .NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)
  • .net图片验证码生成、点击刷新及验证输入是否正确
  • .Net下的签名与混淆
  • .NET序列化 serializable,反序列化
  • .Net中间语言BeforeFieldInit
  • .php文件都打不开,打不开php文件怎么办
  • @Autowired @Resource @Qualifier的区别