当前位置: 首页 > news >正文

使用DataWorks调度DLA循环任务

DataWorks是阿里云上的一款热门产品,可以为用户提供大数据开发调度服务。它支持了Data Lake Analytics(后文简称DLA)以后,DLA用户可以通过它进行定时任务调度,非常方便。本文将主要介绍如何使用DataWorks调度DLA的循环任务。

场景

使用DLA对历史数据按天做清洗。数据清洗的SQL是固定的,只是每次执行的时候需要传入不同的日期。

对于这个场景,我们需要:

  1. 部署一个赋值节点。该节点负责输出日期值,并作为下游循环节点的输入。
  2. 部署一个循环节点。该节点包含用来做数据清洗的一个或者一组SQL,其中关于日期取值是一个变量。每次循环输入值由赋值节点提供。

DataWorks操作

步骤一:新建业务流程和节点

登录DataWorks的控制台,并创建一个业务流程或使用原有的业务流程。
1

在新建的业务流程下,创建一个赋值节点和一个循环节点。
2

步骤二:配置赋值节点

打开节点“日期集合”的编辑页面。这里我们选择SHELL语言,将要执行的日期值写在一个数组里。
3

打开节点“日期集合”的调度配置页面。
在这里需要给赋值节点设置一个上游节点,这里可以设置为当前工作空间的root。比如我的工作空间名字叫jinluo_poc,则该节点为jinluo_poc_root。
4

步骤三:配置循环节点

双击循环节点进入编辑页面。可以看到三个节点,分别是start, sql和end。这里我们需要新建一个DLA的任务节点,并把sql替换为一个DLA的任务节点。
5

在调度配置页面设置依赖关系和节点上下文。上游节点设置为赋值节点“日期集合”,本节点的输入为赋值节点的输出。
6

设置DLA_SQL节点

选择一个DLA的数据源,并填写SQL。
7

这里面的pure_date的值是从赋值节点读入的。每次读取赋值节点的输出结果数组中的一个值。写法是固定的,如下所示。

h.`pure_date`=${dag.input[${dag.offset}]}
  • dag.offset,这个是DataWorks系统的保留变量,代表每一次循环次数相对于第一次的偏移量。即第一次循环中offset为0、第二次为1、第三次为2…第n次为n-1
  • dag.input,这个变量是用户配置的循环节点的上下文依赖。循环节点内部节点如果需要引用这个上下文依赖的值,可以直接用${dag.${ctxKey}}的方式来引用,因为上文中配置的上下文依赖key为input,因此可以使用${dag.input}来引用这个值
  • ${dag.input[${dag.offset}]}:节点数据集初始化的输出是一个表格,DataWorks是可以用偏移量的方式来获取表格数据的某一行的。由于每次循环中dag.offset的值是从0开始递增的,所以最后echo出来的数据应该就是${dag.input[0]}、${dag.input[1]}以此类推达到遍历数据集的效果

设置end节点

该节点的作用是控制循环的结束。
end节点的结束条件:是把dag.loopTimes进行比较,小于则输出True继续循环;不小于则输出False退出循环。dag.input.length变量,标识上下文参数input数组的行数。是系统自动根据节点配置的上下文下发的变量。8

if ${dag.loopTimes} < ${dag.input.length}:
 print True
else:
 print False

在调度配置页面,需要设置上游节点。9

设置完成,保存后,可以看到循环节点变更为10

步骤四:发布

目前在DataWorks的开发界面暂不支持循环节点的运行,需要提交后在运维中心测试运行。
分别点击 “日期集合”和“数据清洗SQL”页面上的“提交按钮”进行提交。11

在提交循环节点时,注意要勾选上所有的节点。12

步骤五:运行

进入运维中心页面,在周期任务的列表里面可以看到我们刚刚提交的两个作业。13

右键“日期集合” -> 补数据 -> 当前节点及下游节点 可以手动执行该组任务。14

提交后可以看到每个节点的运行状态。16

参考

  1. DataWorks官方文档:

http://help.aliyun-inc.com/internaldoc/detail/102311.html?spm=a2c1f.8259796.2.25.24fa96d5a5twQO

相关文章:

  • postgresql行列转换函数
  • SQL中IF和CASE语句
  • Python Day3
  • [20190416]完善shared latch测试脚本2.txt
  • Windows下如何更新 node.js
  • 美团数据调优指北【转载】
  • 如何给列表降维?sum()函数的妙用
  • 好程序员分享JQuery.get提交页面不跳转的解决方法
  • QT 栅格布局
  • 如何根据业务封装自己的功能组件
  • 心理学解析一见钟情的感觉
  • 深入理解nvme hardware queue pair
  • 家具建材行业电商平台解决方案
  • SAP soamanager发布的Webservice服务,调用时出现http500报错
  • 日常英语---九、冒险岛link技能导读
  • 《Java编程思想》读书笔记-对象导论
  • 【162天】黑马程序员27天视频学习笔记【Day02-上】
  • 【MySQL经典案例分析】 Waiting for table metadata lock
  • 【React系列】如何构建React应用程序
  • 【RocksDB】TransactionDB源码分析
  • Android单元测试 - 几个重要问题
  • Asm.js的简单介绍
  • Js实现点击查看全文(类似今日头条、知乎日报效果)
  • JS字符串转数字方法总结
  • Ruby 2.x 源代码分析:扩展 概述
  • Twitter赢在开放,三年创造奇迹
  • 复习Javascript专题(四):js中的深浅拷贝
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 使用权重正则化较少模型过拟合
  • 在Docker Swarm上部署Apache Storm:第1部分
  • 做一名精致的JavaScripter 01:JavaScript简介
  • [地铁译]使用SSD缓存应用数据——Moneta项目: 低成本优化的下一代EVCache ...
  • 阿里云ACE认证学习知识点梳理
  • 仓管云——企业云erp功能有哪些?
  • 积累各种好的链接
  • ​【C语言】长篇详解,字符系列篇3-----strstr,strtok,strerror字符串函数的使用【图文详解​】
  • ​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​
  • ​什么是bug?bug的源头在哪里?
  • (3)llvm ir转换过程
  • (4)事件处理——(6)给.ready()回调函数传递一个参数(Passing an argument to the .ready() callback)...
  • (ctrl.obj) : error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MDd_DynamicDebug”不匹配值“
  • (动态规划)5. 最长回文子串 java解决
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)springboot宠物医疗服务网站 毕业设计688413
  • (个人笔记质量不佳)SQL 左连接、右连接、内连接的区别
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)Android学习系列(31)--App自动化之使用Ant编译项目多渠道打包
  • (转)iOS字体
  • *p++,*(p++),*++p,(*p)++区别?
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • ./configure、make、make install 命令
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .net refrector
  • ?php echo ?,?php echo Hello world!;?
  • @RequestParam详解