当前位置: 首页 > news >正文

Apache Airflow (四) :Airflow 调度shell命令

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


上文说到使用Airflow进行任务调度大体步骤如下:

  1. 创建python文件,根据实际需要,使用不同的Operator
  2. 在python文件不同的Operator中传入具体参数,定义一系列task
  3. 在python文件中定义Task之间的关系,形成DAG
  4. 将python文件上传执行,调度DAG,每个task会形成一个Instance
  5. 使用命令行或者WEBUI进行查看和管理

以上python文件就是Airflow python脚本,使用代码方式指定DAG的结构。

下面我们以调度执行shell命令为例,来讲解Airflow使用。

1. 首先我们需要创建一个python文件,导入需要的类库

# 导入 DAG 对象,后面需要实例化DAG对象
from airflow import DAG# 导入BashOperator Operators,我们需要利用这个对象去执行流程
from airflow.operators.bash import BashOperator

注意:以上代码可以在开发工具中创建,但是需要在使用的python3.7环境中导入安装Airflow包。

D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

2. 实例化DAG

from datetime import datetime, timedelta# default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 4),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)

注意:

  • 实例化DAG有三种方式

第一种方式:

with DAG("my_dag_name") as dag:op=XXOperator(task_id="task")

第二种方式(以上采用这种方式):

my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)

第三种方式:

@dag(start_date=days_ago(2))
def generate_dag():op = XXOperator(task_id="task")
dag = generate_dag()
  • baseoperator基础参数说明:

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多参数。

  • DAG参数说明

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html 查看DAG参数说明,也可以直接在开发工具点击DAG进入源码看下对应参数有哪些。

3. 定义Task

当实例化Operator时会生成Task任务,从一个Operator中实例化出来对象的过程被称为一个构造方法,每个构造方法中都有“task_id”充当任务的唯一标识符。

下面我们定义三个Operator,也就是三个Task,每个task_id 不能重复。

# operator 支持多种类型, 这里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)

注意:

  • 每个operator中可以传入对应的参数,覆盖DAG默认的参数,例如:last task中“retries”=3 就替代了默认的1。任务参数的优先规则如下:①.显示传递的参数 ②.default_args字典中存在的值③.operator的默认值(如果存在)。
  • BashOperator使用方式参照:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator

4. 设置task依赖关系

#使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
# middle.set_upstream(first) # middle会在first执行完成之后执行
# last.set_upstream(middle) # last 会在 middle执行完成之后执行#也可以使用位移符来设置依赖关系
first >> middle >>last # first 首先执行,middle次之,last最后
# first >> [middle,last] # first首先执行,middle ,last并行执行

注意:当执行脚本时,如果在DAG中找到一条环形链路(例如:A->B->C-A)会引发异常。更多DAG task依赖关系可参照官网:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies

5. 上传python配置脚本

到目前为止,python配置如下:

# 导入 DAG 对象,后面需要实例化DAG对象
from airflow import DAG# 导入BashOperator Operators,我们需要利用这个对象去执行流程
from airflow.example_dags.example_bash_operator import dagfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 4),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)# operator 支持多种类型, 这里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)#使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
# middle.set_upstream(first) # middle会在first执行完成之后执行
# last.set_upstream(middle) # last 会在 middle执行完成之后执行#也可以使用位移符来设置依赖关系
first >> middle >>last # first 首先执行,middle次之,last最后
# first >> [middle,last] # first首先执行,middle ,last并行执行

将以上python配置文件上传到$AIRFLOW_HOME/dags目录下,默认$AIRFLOW_HOME为安装节点的“/root/airflow”目录,当前目录下的dags目录需要手动创建。

6. 重启Airflow

“ps aux|grep webserver”和“ps aux|grep scheduler”找到对应的airflow进程杀掉,重新启动Airflow。重启之后,可以在airflow webui看到对应的DAG ID ”myairflow_execute_bash”。

7. 执行airflow

按照如下步骤执行DAG,首先打开工作流,然后“Trigger DAG”执行,随后可以看到任务执行成功。

查看task执行日志:


相关文章:

  • 手写链表C++
  • Hadoop学习总结(使用Java API操作HDFS)
  • [工业自动化-10]:西门子S7-15xxx编程 - PLC主站 - 信号量:数字量
  • C语言如何执行HTTP GET请求
  • linux espeak语音tts;pyttsx3 ubuntu使用
  • Linux系统编程——文件的光标移动
  • 前端设计模式之【访问者模式】
  • 计算机视觉与深度学习 | 改进的SIFT立体匹配算法
  • IP行业API助力于网络分析和数据挖掘
  • centos安装docker和docker-compose
  • 华为eNSP实验-QinQ基本实验
  • 【OpenHarmony内核】Harmony内核之线程操作函数(二)
  • sql语句-实体属性有集合怎么批量查询
  • react 修改less文件后保存,内存溢出,项目崩溃问题解决
  • 解锁潜在商机的钥匙——客户管理系统公海池
  • Bytom交易说明(账户管理模式)
  • Java 内存分配及垃圾回收机制初探
  • Js基础知识(四) - js运行原理与机制
  • PHP 7 修改了什么呢 -- 2
  • SegmentFault 技术周刊 Vol.27 - Git 学习宝典:程序员走江湖必备
  • XML已死 ?
  • 高性能JavaScript阅读简记(三)
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 前言-如何学习区块链
  • 使用SAX解析XML
  • 项目管理碎碎念系列之一:干系人管理
  • 做一名精致的JavaScripter 01:JavaScript简介
  • 仓管云——企业云erp功能有哪些?
  • ​一文看懂数据清洗:缺失值、异常值和重复值的处理
  • #pragma once
  • $$$$GB2312-80区位编码表$$$$
  • (1)(1.19) TeraRanger One/EVO测距仪
  • (42)STM32——LCD显示屏实验笔记
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (C#)Windows Shell 外壳编程系列9 - QueryInfo 扩展提示
  • (javascript)再说document.body.scrollTop的使用问题
  • (Matalb分类预测)GA-BP遗传算法优化BP神经网络的多维分类预测
  • (分布式缓存)Redis持久化
  • (分享)自己整理的一些简单awk实用语句
  • (三分钟了解debug)SLAM研究方向-Debug总结
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (十五)使用Nexus创建Maven私服
  • (转)setTimeout 和 setInterval 的区别
  • (转)Unity3DUnity3D在android下调试
  • (转)微软牛津计划介绍——屌爆了的自然数据处理解决方案(人脸/语音识别,计算机视觉与语言理解)...
  • .halo勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .Net Core和.Net Standard直观理解
  • .NET Framework杂记
  • .NET:自动将请求参数绑定到ASPX、ASHX和MVC(菜鸟必看)
  • .NET精简框架的“无法找到资源程序集”异常释疑
  • .net开发时的诡异问题,button的onclick事件无效
  • .NET牛人应该知道些什么(2):中级.NET开发人员
  • .NET使用存储过程实现对数据库的增删改查
  • [2016.7 Day.4] T1 游戏 [正解:二分图 偏解:奇葩贪心+模拟?(不知如何称呼不过居然比std还快)]
  • [2019.3.20]BZOJ4573 [Zjoi2016]大森林