一、创建python环境

创建3.8版本

conda create --name airflow python=3.8

进入环境

conda activate airflow

二、安装Airflow

设置airflow目录

vim /etc/profile

在末尾添加一行配置

export AIRFLOW_HOME=/root/airflow

使配置生效

source /etc/profile

查看配置是否生效

echo $AIRFLOW_HOME

pip安装指令

  • (尽可能要指定版本,不然会很随机安装版本,坑!项目github
pip install apache-airflow[celery]==2.7.3

查看版本

  • 如果在这一步都很多报错,则需重点排查,python版本是否匹配,推荐python3.8配合上最新apache-airflow==2.7.3(牢记,最好去github上找最新:项目github))
airflow version 

初始化 airflow

  • 执行此步后,会生成在“AIRFLOW_HOME”目录底下的airflow.cfg配置文件
airflow db init

三、配置Airflow

airlfow的配置是通过“AIRFLOW_HOME”目录底下的airflow.cfg配置文件进行读取的。打开airflow.cfg开始配置

mysql数据库配置

  • airflow默认是使用sqlite本地数据库,需要修改为mysql,官方文档推荐的两种链接方式有:

第一种mysql数据连接方式:mysql+mysqldb

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

安装中最坑的地方来了,mysqldb需要安装pip install mysqlclient,有装过的都知道很难装,而且基本装不好,直接舍弃不适用

第二种mysql数据连接方式:mysql+mysqlconnector

mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

这里更坑,如果没有细读官方文档,和对python有认知,根本配置不出来,其实是应该用pymysql

mysql+pymysql://<user>:<password>@<host>[:<port>]/<dbname>

修改airflow.cfg里面的mysql相关配置

  • 存储数据的链接,记得在此之前先创建好数据库schema—>airflow

sql_alchemy_conn = mysql+pymysql://:@[:]/

配置executor和对应的消息队列

  • 这边是由于个人业务需求,选用了celery作为我的执行器。所以配置内修改

executor = CeleryExecutor
broker_url = redis://redis:6379/0
result_backend = redis://redis:6379/1

排坑小指南

之前有点误导别人了,其实celery的结果存储是不支持mysql的,我看配置文件里面的附带的celery官方解释,再结合scheduler的log输出,不难排查问题,result_backend使用redis作为结果存储即可,记得最好不要放在和broker_url 一个db_index即可

重新初始化数据库

如果修改了数据库,则需要重新初始化数据库

airflow db init

初始化时遇到bug!!

  • 通过airflow db init初始化airflow的数据库时遇到了一下提示。
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1067, "Invalid default value for 'updated_at'")
  • 这种情况需要设置一下mysql的配置,去命令行执行以下语句(相关文章
set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

四、启动和停止Airflow

创建用户

airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 281523824@qq.com
-- 自动创建后,会要求输入密码,请自行填写
Password:
Repeat for confirmation:
[2023-12-09T00:21:11.683+0800] {manager.py:211} INFO - Added user %s
User "admin" created with role "Admin"

启动webserver

  • 详细参数信息,可以查阅官方文档
  • -p是端口,-D是守护进程,而不是在前台运行
airflow webserver -p 8080 -D

查看webserver运行状态

ps -ef | grep 'airflow' | grep 'webserver'

停止webserver

  • 官方没有优雅停机指令,只能使用暴力的!
ps -ef | grep 'airflow' | grep 'webserver' | awk '{print $2}' | xargs kill -9 
cd $AIRFLOW_HOME  
rm -rf airflow-webserver.pid
rm -rf airflow-webserver-monitor.pid

启动scheduler

  • -D是守护进程,而不是在前台运行
airflow scheduler -D

查看scheduler运行状态

ps -ef | grep 'airflow' | grep 'scheduler'

停止scheduler

  • 同样是暴力美学
ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs kill -9  
cd $AIRFLOW_HOME  
rm -rf airflow-scheduler.pid

排坑小指南

这里有个很坑的bug,就是使用指令“airflow scheduler”能正常运行,但“airflow scheduler -D”就是启动不了,失效了,简单来说就是Airflow Scheduler只能作为非守护进程工作,而作为守护进程失败。对此进行了一系列的排查:

  1. 首先通过生成的日志进行查看。cd $AIRFLOW_HOME,来到airflow的目录。查看是否有airflow-scheduler.pid这个文件,如果没有就说明没有启动成功。
  2. 再查看airflow-scheduler.err、airflow-scheduler.log、airflow scheduler.out,这三个文件。查看报错异常是什么,我这里是 :
    File “/root/miniconda3/envs/airflow/lib/python3.8/site-packages/pymysql/connections.py”, line 801, in _write_bytes
    self._sock.settimeout(self._write_timeout)
    OSError: [Errno 9] Bad file descriptor
  3. 以上异常和pymysql相关 ,本人下载了源码进行查看,这块的处理确实没办法处理,就是在:
    File “/root/miniconda3/envs/airflow/lib/python3.8/site-packages/airflow/jobs/job.py”, line 230, in prepare_for_execution
    session.commit()
    这里是sqlalchemy的一个提交而已。会报异常,可能会跟很底层的东西有关。修改可能性不大

解决方案

通过nohup进行后台运行

nohup airflow scheduler > /root/airflow/airflow-scheduler.nohup 2>&1 &

关闭方法

pkill -f "airflow scheduler"
或者
ps aux | grep "airflow scheduler" | awk '{print $2}' | xargs kill -9  
或者	
ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs kill -9  

页面正常访问

在这里插入图片描述

启动和停止脚本

本地

#!/bin/bash
case $1 in
"start")
    echo " --------启动 airflow-------"
    conda activate airflow
    airflow webserver -p 8080 -D
    nohup airflow scheduler > /root/airflow/airflow-scheduler.nohup 2>&1 &
    conda deactivate
    ;;
"stop")
    echo " --------关闭 airflow-------"
    ps -ef | egrep 'airflow scheduler|airflow-webserver' | grep -v grep | awk '{print $2}' | xargs kill -15
    ;;
esac

远程

#!/bin/bash
case $1 in
"start"){
	echo " --------启动 airflow-------"
	ssh hadoop102 "conda activate airflow;airflow webserver -p 8080 -D;nohup airflow scheduler > /root/airflow/airflow-scheduler.nohup 2>&1 &;conda deactivate"
};;
"stop"){
	echo " --------关闭 airflow-------"
	ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15
};;
esac

五、常用指令

测试任务,格式:airflow test dag_id task_id execution_time

airflow test test_task test1 2019-09-10

查看生效的 dags

airflow list_dags -sd $AIRFLOW_HOME/dags

开始运行任务(同web界面点trigger按钮)

airflow trigger_dag test_task

暂停任务

airflow pause dag_id

取消暂停,等同于在web管理界面打开off按钮

airflow unpause dag_id

查看task列表

airflow list_tasks dag_id 查看task列表

清空任务状态

airflow clear dag_id

运行task

airflow run dag_id task_id execution_date

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐