当前位置: 首页 > news >正文

flink使用StatementSet降低资源浪费

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

相关文章:

  • 深入探讨JavaScript中的队列,结合leetcode全面解读
  • nvm安装以及idea下vue启动项目过程和注意事项
  • 华为仓颉编程语言
  • SOLID:软件系统设计的五个基本原则
  • [笔记] 高等数学在各工程门类的典型应用场景
  • adb push 报错 ...error: failed to copy...
  • 数据识别概述
  • Linux多进程和多线程(一)-进程的概念和创建
  • CSS Border(边框)
  • IO多路复用学习
  • c++习题09-分离整数的各个数
  • Python特征工程 — 1.3 对数与指数变换
  • 检测水管缺水的好帮手-管道光电液位传感器
  • 最新mysql打开远程访问和修改最大连接数
  • Python爬取国家医保平台公开数据
  • 【347天】每日项目总结系列085(2018.01.18)
  • canvas 绘制双线技巧
  • gf框架之分页模块(五) - 自定义分页
  • Java 网络编程(2):UDP 的使用
  • Java 最常见的 200+ 面试题:面试必备
  • Java,console输出实时的转向GUI textbox
  • js算法-归并排序(merge_sort)
  • node入门
  • vue2.0一起在懵逼的海洋里越陷越深(四)
  • vue的全局变量和全局拦截请求器
  • Vue官网教程学习过程中值得记录的一些事情
  • 微信支付JSAPI,实测!终极方案
  • 中文输入法与React文本输入框的问题与解决方案
  • ​软考-高级-系统架构设计师教程(清华第2版)【第15章 面向服务架构设计理论与实践(P527~554)-思维导图】​
  • ###51单片机学习(1)-----单片机烧录软件的使用,以及如何建立一个工程项目
  • ###C语言程序设计-----C语言学习(6)#
  • #define MODIFY_REG(REG, CLEARMASK, SETMASK)
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • #我与Java虚拟机的故事#连载08:书读百遍其义自见
  • (003)SlickEdit Unity的补全
  • (4.10~4.16)
  • (BFS)hdoj2377-Bus Pass
  • (NSDate) 时间 (time )比较
  • (PWM呼吸灯)合泰开发板HT66F2390-----点灯大师
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (附源码)ssm高校社团管理系统 毕业设计 234162
  • (接口封装)
  • (没学懂,待填坑)【动态规划】数位动态规划
  • (十一)图像的罗伯特梯度锐化
  • (顺序)容器的好伴侣 --- 容器适配器
  • (淘宝无限适配)手机端rem布局详解(转载非原创)
  • (一)RocketMQ初步认识
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • .NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库
  • .net php 通信,flash与asp/php/asp.net通信的方法
  • .net 生成二级域名
  • .Net8 Blazor 尝鲜
  • .NET开源的一个小而快并且功能强大的 Windows 动态桌面软件 - DreamScene2
  • .net快速开发框架源码分享
  • .Net通用分页类(存储过程分页版,可以选择页码的显示样式,且有中英选择)