当前位置: 首页 > news >正文

Apache Airflow (八) :DAG任务依赖设置

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. DAG任务依赖设置一

2. DAG任务依赖设置二

3. DAG任务依赖设置三

4. DAG任务依赖设置四

5. DAG任务依赖设置五


1. DAG任务依赖设置一

  • DAG调度流程图

  • task执行依赖
A >> B >>C
  • 完整代码
'''
airflow 任务依赖关系设置一'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'dag_relation_1', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)A >> B >>C

2. DAG任务依赖设置二

  • DAG调度流程图

  • task执行依赖​​​​​​​
[A,B] >>C >>D
  • 完整代码
'''
airflow 任务依赖关系设置二'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'dag_relation_2', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)[A,B] >>C >>D

3. DAG任务依赖设置三

  • DAG调度流程图

  • task执行依赖
[A,B,C] >>D >>[E,F]
  • 完整代码
'''
airflow 任务依赖关系设置三'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'dag_relation_3', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)[A,B,C] >>D >>[E,F]

​​​​​​​4. DAG任务依赖设置四

  • DAG调度流程图

  • task执行依赖
A >>B>>C>>D
A >>E>>F
  • 完整代码
'''
airflow 任务依赖关系设置四'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'dag_relation_4', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)F = BashOperator(task_id='F',bash_command='echo "run F task"',dag=dag
)A >>[B,C,D]
A >>[E,F]

5. DAG任务依赖设置五

  • DAG调度流程图

  • task执行依赖
A >>B>>E
C >>D>>E
  • 完整代码
'''
airflow 任务依赖关系设置五'''
from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'dag_relation_5', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)A = BashOperator(task_id='A',bash_command='echo "run A task"',dag=dag
)B = BashOperator(task_id='B',bash_command='echo "run B task"',dag=dag
)C = BashOperator(task_id='C',bash_command='echo "run C task"',dag=dag,retries=3
)D = BashOperator(task_id='D',bash_command='echo "run D task"',dag=dag
)E = BashOperator(task_id='E',bash_command='echo "run E task"',dag=dag
)A >>B>>E
C >>D>>E

相关文章:

  • Docker push的 http 413问题处理
  • 卡尔曼家族从零解剖-(07) 高斯分布积分为1,高斯分布线性变换依旧为高斯分布,两高斯函数乘积仍为高斯。
  • 智慧汽车—城市NOA迎爆发
  • 【Python】Pandas(学习笔记)
  • 大数据毕业设计选题推荐-机房信息大数据平台-Hadoop-Spark-Hive
  • 学习王阳明知行合一随录
  • yolov5模型代码怎么修改
  • 【ArcGIS处理】行政区划与流域区划间转化
  • C语言编程陷阱(三)
  • 此芯科技加入绿色计算产业联盟,参编绿色计算产业发展白皮书
  • Ansys Electronics Desktop仿真——HFSS线圈寄生电阻,电感
  • 本地开发环境和服务器传输数据的几种方法
  • vue3路由
  • 基础框架代码解释
  • mybatis之主键返回
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • Android组件 - 收藏集 - 掘金
  • CSS 三角实现
  • extjs4学习之配置
  • GitUp, 你不可错过的秀外慧中的git工具
  • IOS评论框不贴底(ios12新bug)
  • JavaScript 一些 DOM 的知识点
  • JavaWeb(学习笔记二)
  • js递归,无限分级树形折叠菜单
  • MYSQL 的 IF 函数
  • supervisor 永不挂掉的进程 安装以及使用
  • Terraform入门 - 3. 变更基础设施
  • unity如何实现一个固定宽度的orthagraphic相机
  • 大整数乘法-表格法
  • 开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
  • 数组的操作
  • Nginx实现动静分离
  • $$$$GB2312-80区位编码表$$$$
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (Bean工厂的后处理器入门)学习Spring的第七天
  • (day6) 319. 灯泡开关
  • (done) 两个矩阵 “相似” 是什么意思?
  • (第9篇)大数据的的超级应用——数据挖掘-推荐系统
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF
  • (四)Android布局类型(线性布局LinearLayout)
  • (五)大数据实战——使用模板虚拟机实现hadoop集群虚拟机克隆及网络相关配置
  • (一)UDP基本编程步骤
  • (转)Linux整合apache和tomcat构建Web服务器
  • (轉貼) 寄發紅帖基本原則(教育部禮儀司頒布) (雜項)
  • .net core 控制台应用程序读取配置文件app.config
  • .NET 反射的使用
  • .NET/C# 判断某个类是否是泛型类型或泛型接口的子类型
  • .NET导入Excel数据
  • .Net中ListT 泛型转成DataTable、DataSet
  • .Net转前端开发-启航篇,如何定制博客园主题
  • // an array of int
  • /etc/sudoers (root权限管理)
  • [boost]使用boost::function和boost::bind产生的down机一例
  • [CareerCup] 13.1 Print Last K Lines 打印最后K行