1

Airflow搭建与使用

 2 years ago
source link: https://shidawuhen.github.io/2022/03/22/Airflow%E6%90%AD%E5%BB%BA%E4%B8%8E%E4%BD%BF%E7%94%A8/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Airflow搭建与使用

2022-03-22

Airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。

Airflow 将workflow编排为由tasks组成的DAGs(有向无环图),调度器在一组workers上按照指定的依赖关系执行tasks。同时,Airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且Airflow提供了监控和报警系统。Airflow的调度依赖于crontab命令,与crontab相比Airflow可以直观的看到任务执行情况、任务之间的逻辑依赖关系、可以设定任务出错时邮件提醒、可以查看任务执行日志。

本次主要聊一下Airflow的搭建与简单使用,这应该是目前网上最全的配置了。这次搭建是在Centos7系统,Airflow版本是2.2.4。

# airflow需要home目录,默认是~/airflow,
# 但是如果你需要,放在其它位置也是可以的
# (可选)
export AIRFLOW_HOME=~/airflow
# 如果没有pip3,需要进行安装
sudo yum install -y python3-pip
# 升级,防止报错Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-buil
pip3 install --upgrade pip
# 使用pip从pypi安装
pip3 install apache-airflow

Airflow默认使用的是sqlite,正式环境需要改为MySQL。

安装MySQL

Centos下先安装MySQL,强烈建议直接安装8.0,否则各种异常情况。

# 安装
# 在可写权限目录下安装
sudo wget https://repo.mysql.com/mysql80-community-release-el7-3.noarch.rpm
sudo yum -y install mysql80-community-release-el7-3.noarch.rpm
sudo yum -y install mysql-community-server --nogpgcheck
# 启动
sudo systemctl start mysqld.service
# 关闭命令为sudo service mysqld stop
# 登录8.0前需要获取临时密码,5.6的初始密码为空,直接回车即可
grep "password" /var/log/mysqld.log
mysql -uroot -p
# 8.0修改root密码
ALTER USER 'root'@'localhost' IDENTIFIED BY '1234Abcd*';

创建Airflow使用的DB

CREATE DATABASE airflow CHARACTER SET utf8;
# 创建airflow用户
CREATE USER 'airflow'@'localhost' IDENTIFIED BY '1234Abcd*';
GRANT ALL PRIVILEGES ON *.* TO 'airflow'@'localhost';
# 如果mysql版本较低,无法更新,可更新my.cnf
set global explicit_defaults_for_timestamp =1;
FLUSH PRIVILEGES;

更改airflow.cfg

# sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

sql_alchemy_conn = mysql+pymysql://airflow:1234Abcd*@localhost:3306/airflow
result_backend = db+mysql://airflow:1234Abcd@localhost:3306/airflow
executor = LocalExecutor
# 初始化数据库,如果sqlite版本过低,需要升级。执行完后出现cfg文件
#airflow initdb 已废弃
airflow db init
# 启动web服务器,默认端口是8080
airflow webserver -p 8080 -D
# 启动定时器
airflow scheduler -D
# 创建账号密码
airflow users create \
--username admin \
--firstname pzq \
--lastname pzq \
--role Admin \
--email [email protected]
然后输入密码w
# 在浏览器中浏览localhost:8080,并在home页开启example dag

创建dags

查看airflow.cfg中的配置dags_folder,在该目录下添加python文件。

start_date表示从2022-3-21开始,使用的时区为UTC。schedule_interval表示每隔三分钟需要执行一次,Airflow会根据开始时间和间隔时间自动计算出下一次的执行时间。

import datetime
import pendulum

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

with DAG(
dag_id='my',
schedule_interval='*/3 * * * *',
start_date=pendulum.datetime(2022, 3, 21, tz="UTC"),
# start_date=days_ago(1),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
run_this_last = DummyOperator(
task_id='run_this_last',
)

# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
)
# [END howto_operator_bash]

run_this >> run_this_last

for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this

# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last

# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
task_id='this_will_skip',
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last

if __name__ == "__main__":
dag.cli()

代码中的A>>B,表示A先执行,后执行B。image-20220321224211377

BashOperator可以执行Bash命令,除了BashOperator外,还有其它Operator可以使用,具体可以查看资料。

过一会在Web页面上便能看到这个Dag:image-20220321224033292

关闭与重启

关闭airflow方案:

  1. ps -ef | grep -Ei ‘airflow’ | grep -v ‘grep’ | grep -v ‘%s’ | awk ‘{print $2}’ | xargs -i kill -9 {}

  2. 删除~/airflow下的airflow-webserver和airflow-scheduler

  3. airflow webserver -p 8080 -D

  4. airflow scheduler -D

  5. 特殊情况可重新设置db:airflow db reset

Airflow国内用的比较少,相关资料不全。如果想在实际业务中使用,对于Dag的基本组成部分、Operator等也要有所了解。

sqlit版本太低

wget –no-check-certificat https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK