1 概述

Airflow是一个以编程方式编写,用于管理和调度工作流的平台。可以帮助你定义复杂的工作流程,然后在集群上执行和监控这些工作流。

Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。Airflow的可扩展Python框架可以让你构建连接几乎任何技术的工作流程。丰富的用户界面可以随时查看生产中正在运行的管道,帮助你管理工作流程的状态,监视进度以及需要时对问题进行故障排除。

Airflow的主要组件有:

DAG(有向无环图):使用Airflow将工作流编写任务的有向无环图(DAG)。一个DAG定义了一个工作流,它包含所有任务、任务的依赖关系和时间表。

任务(Task):一个任务定义了一个单独的单元工作,有一个确定的开始和结束。一个任务可以依赖于其他任务。

运算符(Operator):一个运算符封装了一个任务,并定义了它的执行逻辑。Airflow内置了许多运算符,如BashOperator、PythonOperator、EmailOperator等。你也可以自定义运算符。

时间轴(Timeline):时间轴让你以图形方式查看 DAG 的运行情况和状态。

调度器(Scheduler):调度器监视时间轴并触发需要运行的任务。

执行器(Executor):executor负责实际运行任务。Airflow支持多种executor,如LocalExecutor, CeleryExecutor, KubernetesExecutor 等。

2 名词

(1)Dynamic:Airflow管道是用Python代码配置的,允许动态生成管道。Airflow配置需要使用Python,这允许编写可动态实例化管道的代码。

(2)Extensible:Airflow框架包含许多运算符来连接各种技术。Airflow的所有组件都是可扩展的。轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。

(3)Elegant:Airlfow是精简灵活的,使用功能强大的Jinja模板引擎,将脚本参数化内置于Airflow的核心中。

(4)Scalable:Airflow具有模板块架构,并使用消息队列来安排任意数量的工作任务。

3 airflow优缺点

优点:

Python脚本实现DAG,非常容易扩展;

可实现复杂的依赖规则;

外部依赖较少,搭建容易,仅依赖DB和rabbitmq;

工作流依赖可视化。有一套完整的UI,可视化展现所有任务的状态及历史信息;(本人刚开始主要看重这点)

完全支持crontab定时任务格式,可以通过crontab格式指定任务何时进行;

业务代码和调度系统解耦,每个业务的流程代码以独立的Python脚本描述,里面定义了流程化的节点来执行业务逻辑,支持任务的热加载.

缺点:

Airflow是为有限的批处理工作流构建的。虽然CLI和REST API确实允许触发工作流,但Airflow不是为无限运行的基于事件的工作流构建的。Airflow不是流解决方案。然而,像Apache Kafka这样的流系统通常与Apache Airflow一起使用。Kafka可以用于实时接收和处理事件数据,事件数据被写入存储位置,Airflow定期启动处理一批数据的工作流。

如果你更喜欢点击而不是编码,Airflow可能不是正确的解决方案。Web界面旨在最大限度地简化工作流的管理,Airflow框架不断改进以最大限度地简化开发人员体验。然而,Airflow的理念是将工作流定义为代码,所以代码始终是必需的。

4 Airflow安装

airflow官网地址:https://airflow.apache.org。

1)先安装并配置好python环境(可以参考Anaconda安装即可,如果项目不需要依赖太多工具包,可选择更简洁的MiniConda)并激活。

2)安装airflow

pip install apache-airflow

3)初始化airflow

airflow db init

4)查看版本

airflow version

5)启动airflow web服务,启动后浏览器访问http://ip_address:12025(如果不知道ip地址的就用ifconfig命令去linux下获取)

airflow webserver -p 12025 -D

6)启动airflow调度

airflow scheduler -D

7)创建账号(斜杠别忘记了)

airflow users create \

  --username admin \

  --firstname trisyp \

  --lastname trisyp \

  --role Admin \

  --email trisyp@email.com

回车之后会让你输入两次password,我们就用123456

8)启动停止脚本

vim af.sh

#!/bin/bash

case $1 in

"start"){

    echo " --------启动 airflow-------"

    ssh ip_address "conda activate airflow;airflow webserver -p 12025 -D;airflow scheduler -D; conda deactivate"

};;

"stop"){

    echo " --------关闭 airflow-------"

    ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15

};;

esac

添加权限即可使用。

trisyp@ip_address bin]$ chmod +x af.sh

5 修改数据库为MySQL

1)先在MySQL中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭MySQLSSL证书

查看SSL是否开启  YES为开启

mysql> SHOW VARIABLES LIKE '%ssl%';

+---------------+-----------------+

| Variable_name | Value           |

+---------------+-----------------+

| have_openssl  | YES             |

| have_ssl      | YES             |

| ssl_ca        | ca.pem          |

| ssl_capath    |                 |

| ssl_cert      | server-cert.pem |

| ssl_cipher    |                 |

| ssl_crl       |                 |

| ssl_crlpath   |                 |

| ssl_key       | server-key.pem  |

+---------------+-----------------+

3)修改配置文件my.cnf(注意:直接数据库修改值不起作用),加入以下内容:

# disable_ssl

skip_ssl

4)添加python连接的依赖,官网介绍的方法有两种:

这里我们选择mysql+mysqlconnector。

pip install mysql-connector-python

5)修改airflow的配置文件(vim ~/airflow/airflow.cfg):

[database]

# The SqlAlchemy connection string to the metadata database.

# SqlAlchemy supports many different database engines.

# More information here:

# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri

#sql_alchemy_conn = sqlite:home/trisyp/airflow/airflow.db

sql_alchemy_conn = mysql+mysqlconnector://root:123456@ip_address:3306/airflow_db

6)关闭airflow,初始化后重启:

af.sh stop

airflow db init

af.sh start

7)若初始化报错1067 - Invalid default value for ‘update_at’:

原因:字段 'update_at' timestamp类型,取值范围是:1970-01-01 00:00:00 2037-12-31 23:59:59UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改mysql存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启MySQL会造成参数失效(注意:这样就需要重新创建账号),推荐将参数写入到配置文件my.cnf中。

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

6 修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

1)修改airflow的配置文件(vim ~/airflow/airflow.cfg)

[core]

# The executor class that airflow should use. Choices include

# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,

# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the

# full import path to the class when using a custom executor.

executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。

7 部署使用

1)测试环境启动

本次测试使用的是spark的官方案例,所有需要启动hadoop和spark的历史服务器。

myhadoop.sh start

cd /opt/module/spark-yarn/sbin/start-history-server.sh

2)查看Airflow配置文件

vim ~/airflow/airflow.cfg

3)编写.py脚本,创建work-py目录用于存放python调度脚本

mkdir ~/airflow/dags

cd dags/

然后把脚本文件放到dags文件夹,代码如下:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {  # 设置默认参数。

    # 用户

    'owner': 'test_owner',

    # 是否开启任务依赖

    'depends_on_past': True,

    # 邮箱

    'email': ['trisyp@email.com'],

    # 启动时间

    'start_date':datetime(2022,11,28),

    # 出错是否发邮件报警

    'email_on_failure': False,

    # 重试是否发邮件报警

    'email_on_retry': False,

    # 重试次数

    'retries': 3,

    # 重试时间间隔

    'retry_delay': timedelta(minutes=5),

}

# 声明任务图,schedule_interval:调度频率。

dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))

 

# 创建单个任务

t1 = BashOperator(  # BashOperator:具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

    # 任务id:任务唯一标识(必填)。

    task_id='dwd',

    # 具体任务执行命令。

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    # 重试次数

    retries=3,

    # 把任务添加进图中

    dag=dag)

t2 = BashOperator(

    task_id='dws',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

t3 = BashOperator(

    task_id='ads',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

# 设置任务依赖:ads任务依赖dws任务依赖dwd任务。

t2.set_upstream(t1)

t3.set_upstream(t2)

4)等待一段时间,刷新任务列表

airflow dags list

5)已出现myairflow_execute_bash任务(刷新页面)

6)点击运行

7)查看dag图、甘特图,点击成功任务,查看日志

8)查看脚本代码

9)Dag任务操作

9.1 删除Dag任务

主要删除DAG任务不会删除底层文件,过一会还会自动加载回来。

9.2 查看当前所有dag任务

# 查看所有任务

airflow dags list

# 查看单个任务

airflow tasks list test --tree

8 配置邮件服务器

1)保证邮箱已开SMTP服务

2)修改airflow配置文件,用stmps服务对应587端口

vim ~/airflow/airflow.cfg 

smtp_host = smtp.qq.com

smtp_starttls = True

smtp_ssl = False

smtp_user = trisyp@email.com

# smtp_user =

smtp_password = qluxdbuhgrhgbigi

# smtp_password =

smtp_port = 587

smtp_mail_from = trisyp@email.com

3)重启airflow

af.sh stop

af.sh start

4)编辑test.py脚本,加入emailOperator

from airflow.operators.email_operator import EmailOperator

email=EmailOperator(

    task_id="email",

    to="yaohm163@163.com ",

    subject="test-subject",

    html_content="<h1>test-content</h1>",

    cc="trisyp@email.com ",

    dag=dag)

t2.set_upstream(t1)

t3.set_upstream(t2)

email.set_upstream(t3)

5)查看页面是否生效

6)运行测试

9 避坑指南

1)Exception rendering Jinja template for task

2)Intel MKL FATAL ERROR: Cannot load ../numexpr/../../../libmkl_rt.so.1.

强制更新airflow到最新版

3)error: subprocess-exited-with-error

解决方案:

错误有明确的提示,缺少pkg-config,所以就先安装这个包,然后在安装mysqlclient。

sudo apt-get install pkg-config

4)Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2)

解决方案:

先用命令“find / -name ‘mysql.sock”来查看下这个文件所在目录,如果有就建立软连接(不要想着拷贝复制,无效的),命令是“ln -s /tmp/mysql.sock”。如果没有就找my.cnf文件,一般文件地址为/etc/mysql/my.cnf,然后通过vim加上socket路径信息,一定要加mysqld这个分组,不然会报找不到分组这个错;Found option without preceding group

5)Segmentation fault (core dumped)

解决方案:

在配置mysql存储的时候要加上mysqlconnector就解决了。这个坑非常恶心,你参照某些教程直接只配mysql,忽视了connector,碰到了还找不到解决方案,因为核心存储转移你不知道怎么搞。

cd /etc

vim profile

加入:

export AIRFLOW_HOME=/root/airflow

sudo mysql

create database airflow_db;

create user 'airflow'@'%' identified by '123456';

grant all on airflow_db .* to 'airflow'@'%';

sql_alchemy_conn = mysql://airflow:123456@10.0.0.22:3306/airflow_db

10 参考链接

https://yuchaoshui.com/1bd10cc/

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐