当前位置: 首页 > news >正文

基于Flink的流式计算可视化开发实践之配置->任务生成->任务部署过程

1. 引言

在我们大数据平台(XSailboat)的DataStudio模块中实现了基于Hive的业务流程开发和基于Flink的实时计算管道开发。

DataStudio是用来进行数据开发的,属于开发环境,另外还有任务运维模块,负责离线分析任务和实时计算任务在生产环境的部署和运维。在开发环境开发好的业务流程和计算管道可以提交/发布到生产环境。
DataStudio
整个大数据平台的可视化开发其实都是一种配置驱动的思想。在界面上开发编辑的都是一种配置数据,在部署运行的时候,后台会有程序将其转为可执行程序或解释执行配置。

2. Flink计算任务的配置化实现

在我的另一篇文章《Flink的DAG可视化开发实践》中表述了我们对Flink任务的一种模式抽象思路。有了这种模式抽象之后,让Flink计算任务适合于可视化开发,即把一种完全自由的代码开发模式,转变成了一种“输入”、“输出”、“状态存储器”、“个性化配置”、“基础配置”、“前置处理”、“后置处理”等阶段性、部件型的可配置项。

Flink有很多的算子,在原来的算子特性的基础之上,套用模式,并且对它的功能进行一些个性化设定,就可以形成自己的界面开发用的算子及配置。

例如“SQL增量查询”,就是我们自定义实现的一种在支持JDBC和SQL的数据库上,对一个SQL查询,可以基于一个具有增长特性的字段(例如自增长的字段或者最近编辑时间,记录型数据的创建时间等)进行定时增量查询和读取的源节点。
它的个性化配置界面如下:
在这里插入图片描述
这个节点的配置面板中的各个配置项暂且不介绍。从界面中可以看出SQL增量查询的功能配置可以转化成一个多层次结构的Bean。这个Bean是“SQL增量查询”节点实例信息的一个组成部分。
如此,一个计算管道DAG图,就可以用“计算管道(图)–>算子节点–>节点配置”这样的多层数据结构,并映射到成多张关系数据库表,并在其中存储。
这样就实现了算子的配置化及配置存储。

3. 配置转成Flink计算任务

既然是配置驱动的程序,基本都是“配置+解释执行器”的构成方式。配置是多样的,而解释执行器就一种实现。

Flink的计算任务开发,都会开发至少一个带main函数的Java类,在部署运行的时候上传jar包并指定main函数所在Java类。

一般的Java程序,也会开发一个带main函数的类,在MANIFEST.MF文件中指定或在命令行里面指定,这样JVM就知道程序的入口。Flink的计算任务也类似,只是不是用来告诉JVM的,而是用来告诉JobManager的。JobManager从这里进去,执行里面的代码,构建出计算任务图(有多阶段的图,可以不用细究)。构建出图之后,再将其拆分交给一个或多个TaskManager执行。

所以我们的main函数里面的逻辑只是构建了一个计算任务以及每一个该怎么执行的执行流图。这和反应式编程很类似,先是构建计算管路,再塞入数据执行。这和我们一般的函数主动式调用有所不同,那是调用即执行。

既然Flink的Job的main函数里面是构建计算管路,那么我们按照计算管道的DAG图及其配置,生成计算管路即可。即一手解读配置,一手按配置构建算子,组成计算任务。

我们界面上定义的每一个节点,都有专门的构建器,将节点转成相应的算子。所以配置转计算任务的过程是:

1. 提交计算任务的时候,通过参数指定运行的是那个计算管道。因为在平台里面有很多工作空间,每个工作空间里面有许多计算管道。
2. Flink的JobManager运行“执行解释器”的jar,进去其main函数。
3. 在main函数中,获取相关入参,其中就有计算管道id,然后调用其它服务提供的通过id获取计算管道及其配置信息的接口,获取计算管道的详细信息。
4. 解析计算管道详细信息,构建计算任务,将计算管道中的每个算子配置信息转换成Flink的算子。

下面贴出上面例举的SQL增量查询节点的构建器,以便更好理解我们是怎么做的。

... 省略
public class SI_SQLIncQuery_Builder extends StreamSourceNodeBuilder
{@Overridepublic CPipeNodeType getNodeType(){return CPipeNodeType.SI_SQLIncQuery ;}@Overridepublic void buildStreamFlow(JSONObject aNodeJo, IStreamFlowBuilder aStreamFlowBuilder , WorkContext aCtx) throws Exception{String nodeId = aNodeJo.optString("id") ;String nodeName = aNodeJo.optString("name") ;StreamExecutionEnvironment env = aStreamFlowBuilder.getExecutionEnvironment() ;JSONObject execConfJo = aNodeJo.optJSONObject("execConf") ;JSONObject baseConfJo = aNodeJo.optJSONObject("baseConf") ;String dsId = execConfJo.optString("dataSourceId") ;JSONObject dsJo = aNodeJo.pathJSONObject("dataSources", dsId);ConnInfo connInfo = JacksonUtils.asBean(dsJo.toJSONString() , ConnInfo.class) ;DataSource ds = new DataSource() ;ds.setId(dsId) ;ds.setName(dsJo.optString("name")) ;ds.setType(dsJo.optEnum("dataSourceType" , DataSourceType.class)) ;WorkEnv workEnv = WorkEnv.valueOf(aCtx.getWorkEnv()) ;if(WorkEnv.dev == workEnv)ds.setDevConnInfo(connInfo) ;elseds.setProdConnInfo(connInfo) ;// 查询密码WorkContext ctx = aStreamFlowBuilder.getWorkContext();KeyPair keyPair = RSAKeyPairMaker.getDefault().newOne().getValue();HttpClient client = aCtx.getGatewayClient();String cipherText = client.askForString(Request	.GET().path(IApis_Gateway.sGET_DataSourcePassword).queryParam("env", ctx.getWorkEnv()).queryParam("id", dsId).queryParam("publicKey", RSAUtils.toString(keyPair.getPublic())).queryParam("usage", "TDengine类型的下沉节点[" + nodeName + "]"));String password = RSAUtils.decrypt(keyPair.getPrivate(), cipherText);((ConnInfo_Pswd)connInfo).setPassword(password);int periodMs = execConfJo.optInt("periodMs") ;List<String> storeStateFields = execConfJo.optJSONArray("storeStateFields").toCollection(CS.arrayList() , XClassUtil.sCSN_String) ;DatasetDescriptor dsDesc = JacksonUtils.asBean(execConfJo.optJSONObject("dataset").toJSONString() , DatasetDescriptor.class) ;Dataset dataset = new Dataset() ;dataset.setDatasetDescriptor(dsDesc) ;dataset.setName(nodeName) ;dataset.setDataSourceId(ds.getId()) ;dataset.setWorkEnv(workEnv) ;dataset.setDataSourceType(ds.getType()) ;JSONArray outRowFieldsJa = execConfJo.optJSONArray("outRowFields") ;Assert.notNull(outRowFieldsJa , "没有找到outRowFields!%s" , execConfJo);ERowTypeInfo rowTypeInfo = JSONKit.toRowTypeInfo(outRowFieldsJa) ;// 水位线设置待实现WatermarkStrategy<Row> watermarkStrategy = null ;WaterMarkGenMethod waterMarkGenMethod = execConfJo.optEnum("waterMarkGenMethod" , WaterMarkGenMethod.class) ;if(waterMarkGenMethod == null){watermarkStrategy = WatermarkStrategy.noWatermarks() ;waterMarkGenMethod = WaterMarkGenMethod.NoWatermarks ;}else{switch(waterMarkGenMethod){case NoWatermarks :	{watermarkStrategy = WatermarkStrategy.noWatermarks() ;}break ;case MonotonousTimestamps:watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps() ;break ; case BoundedOutOfOrderness:JSONObject waterMarkGenMethodConfJo = execConfJo.getJSONObject("waterMarkGenMethodConf") ;TimeUnit timeUnit = TimeUnit.valueOf(waterMarkGenMethodConfJo.optString("timeUnit")) ;int timeLen = waterMarkGenMethodConfJo.optInt("timeLen" , 0) ;Assert.isTrue(timeLen>0 , "时间长度必须大于0!") ;Duration duration = null ;switch(timeUnit){case NANOSECONDS:duration = Duration.ofNanos(timeLen) ;break ;case MILLISECONDS:duration = Duration.ofMillis(timeLen) ;break ;case SECONDS:duration = Duration.ofSeconds(timeLen) ;break ;case MINUTES:duration = Duration.ofMinutes(timeLen) ;break ;case HOURS:duration = Duration.ofHours(timeLen) ;break ;case DAYS:duration = Duration.ofDays(timeLen) ;break ;case MICROSECONDS:duration = Duration.of(timeLen, ChronoUnit.MICROS) ;break ;}watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness(duration) ;break ;case MaxWatermarks:watermarkStrategy = WatermarkStrategy.forGenerator(new MaxWaterMarkGenSupplier()) ;break ;default:throw new IllegalStateException("未支持的水位线生成方法:"+waterMarkGenMethod) ;}}String timestampExpr = execConfJo.optString("timestampExpr") ;if(XString.isNotEmpty(timestampExpr) && waterMarkGenMethod != WaterMarkGenMethod.NoWatermarks){watermarkStrategy = watermarkStrategy.withTimestampAssigner(new ExprTimestampAssigner(aCtx.getPipeArgs() , timestampExpr, rowTypeInfo)) ;}mLogger.info("水位线生成策略是:{} , 时间表达式是:{}" , waterMarkGenMethod , timestampExpr) ;SQLIncQuerySourceFunction sourceFunc = new SQLIncQuerySourceFunction(storeStateFields, periodMs, dataset, ds) ;SingleOutputStreamOperator<Row> dss = env.addSource(sourceFunc , nodeName , rowTypeInfo).assignTimestampsAndWatermarks(watermarkStrategy)		// 2023-01-08 这一句是必需的,否则不会产生水位线.name(nodeName).uid(nodeId);int parallelism = baseConfJo.optInt("parallelism", 1) ;if(parallelism <=0 )mLogger.info("指定的并发度为 {} , 小于1,将不设置,采用缺省并发度。" , parallelism) ;else{dss.setParallelism(parallelism) ;}aStreamFlowBuilder.putFlowPoint(nodeId , dss);}
}

3. 计算任务的部署

我们要构建的是一套可视化开发、部署平台,在我们的界面上就能完成开发、调试、部署的过程。我们的大数据平台底层基础设施有Hadoop,所以我们考虑使用Hadoop Yarn的容器部署Flink集群。要使用Yarn容器部署Flink计算任务,首先需要将程序包上传到Hadoop FS中。
在这里插入图片描述
我们这里把我们自己开发的扩展部分(ext_jars,解释执行器及其相关jar)和Flink的原生程序包(app,扩展了一些数据库驱动)分成两部分,在我们进行容器化部署的时候,会将其合并。

Flink集群在容器中以Session模式运行,一个Flink集群可以运行多个计算任务。我们给Flink集群增加了一个标签,以区分各个Flink集群。我们设定开发环境,一个工作空间只能运行一个Flink集群,用来开发调试。一个工作空间,在生产环境可以运行1个或1个以上的不同标签的集群。在生产环境部署的时候,需要通过标签指定部署到那个集群,如果标签不存在,就会部署一个新的指定标签的集群,并在上面部署计算任务。
计算管道部署

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 动手学深度学习(pytorch土堆)-04torchvision中数据集的使用
  • C++:类与对象——详解继承、多继承、虚继承
  • SQL进阶技巧:每年在校人数统计 | 区间重叠问题
  • Python网络爬虫:如何高效获取网络数据
  • Python 基本库用法:数学建模
  • 黄仁勋的思维世界:Nvidia的AI和游戏之王
  • 论文翻译:ICLR-2024 PROVING TEST SET CONTAMINATION IN BLACK BOX LANGUAGE MODELS
  • C++当中的多态(三)
  • pytorch对不同的可调参数,分配不同的学习率
  • xxl-job
  • [C++]spdlog学习
  • P5735 【深基7.例1】距离函数
  • 【可测试性实践】C++ 单元测试代码覆盖率统计入门
  • 用go语言实现树和哈希表算法
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • 【跃迁之路】【463天】刻意练习系列222(2018.05.14)
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • Asm.js的简单介绍
  • Codepen 每日精选(2018-3-25)
  • Docker 笔记(2):Dockerfile
  • Electron入门介绍
  • ES10 特性的完整指南
  • go语言学习初探(一)
  • HTTP--网络协议分层,http历史(二)
  • IOS评论框不贴底(ios12新bug)
  • Java比较器对数组,集合排序
  • node-sass 安装卡在 node scripts/install.js 解决办法
  • Vim 折腾记
  • 阿里中间件开源组件:Sentinel 0.2.0正式发布
  • 创建一种深思熟虑的文化
  • 对超线程几个不同角度的解释
  • 看图轻松理解数据结构与算法系列(基于数组的栈)
  • 批量截取pdf文件
  • 前端临床手札——文件上传
  • 远离DoS攻击 Windows Server 2016发布DNS政策
  • 转载:[译] 内容加速黑科技趣谈
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • 格斗健身潮牌24KiCK获近千万Pre-A轮融资,用户留存高达9个月 ...
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • ​学习笔记——动态路由——IS-IS中间系统到中间系统(报文/TLV)​
  • ​油烟净化器电源安全,保障健康餐饮生活
  • # .NET Framework中使用命名管道进行进程间通信
  • #HarmonyOS:Web组件的使用
  • #我与Java虚拟机的故事#连载19:等我技术变强了,我会去看你的 ​
  • (7)svelte 教程: Props(属性)
  • (cos^2 X)的定积分,求积分 ∫sin^2(x) dx
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (Spark3.2.0)Spark SQL 初探: 使用大数据分析2000万KF数据
  • (补)B+树一些思想
  • (附源码)springboot社区居家养老互助服务管理平台 毕业设计 062027
  • (免费领源码)python#django#mysql校园校园宿舍管理系统84831-计算机毕业设计项目选题推荐
  • (文章复现)基于主从博弈的售电商多元零售套餐设计与多级市场购电策略
  • (一)基于IDEA的JAVA基础1
  • (转)Android学习笔记 --- android任务栈和启动模式
  • .NET 8.0 发布到 IIS