Airflow是什么?
Airflow是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间调度。
Airflow独立于我们要运行的任务,只需要把任务的名字和运行方式提供给Airflow作为一个task就可以。
系统环境
- 使用的airflow版本为
airflow==1.8
。注:1.9版本以上已改名为apache-airflow
Centos 7.x
,使用系统普通用户进行安装以及运行。即hhjie
用户。- 在普通用户
hhjie
中安装anaconda3
,并创建虚拟环境conda create -n airflow python=2.7
指定Python虚拟环境为2.7。如果只是安装了元数据,没有pip,则先安装pip
,在虚拟环境中conda install pip
,安装完成后使用pip -V
检查pip环境变量是否是虚拟环境中的。在anaconda中使用国内镜像。在当前目录下,输入:
conda config --add channels 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/'
conda config --set show_channel_urls yes
安装与使用。
系统依赖环境安装:
sudo yum install gcc gcc-c++ Fernet MySQL-python sqlite-devel lxml openssl openssl-devel mysql-devel python-devel
安装MySQL数据库支持
pip install airflow[mysql]
MySQL安装【自行安装】
- 新建用户和数据库
mysql> CREATE DATABASE airflow;
# 新建用户`admin`,密码为`admim-airflow`, 该用户对数据库`airflow`有完全操作权限
mysql> GRANT all privileges on airflow.* TO 'admin'@'localhost' IDENTIFIED BY 'admin-airflow';
mysql> FLUSH PRIVILEGES;
- 缺少mysql_config 执行命令:
ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config
安装主模块 airflow -->
pip install airflow
安装数据库模块、密码模块
pip install airflow[password]
配置airflow
- 设置环境变量
设置$AIRFLOW_HOME环境变量。首次执行airflow命令时,会在$AIRFLOW_HOME下面创建airflow的配置文件airflow.cfg。即在当前用户的~/.bash_profile
或者 ~/.bashrc
文件中新增export AIRFLOW_HOME=/data/airflow
然后source 文件生效。
-
输入
airflow
首次输入命令会在airflow的目录生成文件。 -
修改配置文件:查看airflow.cfg文件,整个文件分为core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin几个部分。
对其中一些参数做修改,其它的保持默认值即可:-
[core]中
executor = SequentialExecutor
修改为executor = CeleryExecutor
;后面用到celery
-
[core]中
sql_alchemy_conn = sqlite:////etl/prod/airflow/airflow.db
修改为sql_alchemy_conn = mysql://user:password@localhost:3306/airflow
;使用MySQL数据库 -
[core]中
load_examples = True
修改为load_examples = False
;不使用示例 -
[core]中fernet_key = ; 利用脚本生成,粘贴进去。
-
[webserver]中
base_url =
修改为base_url = http://your-ip:8080
;修改为服务器ip地址 -
[webserver]中
authenticate = False
修改并新增authenticate = True
,auth_backend = airflow.contrib.auth.backends.password_auth
;web页面设置用户密码登录。 -
[smtp]中 修改邮件配置
-
[celery]中修改broker:
# broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
修改为broker_url = amqp://rabbitmq用户名:rabbitmq密码@localhost:5672/rabbitmq设置的虚拟主机名
=>broker_url = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq
-
[celery]中修改celery_result_backend。
# celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow]
修改为celery_result_backend = amqp://rabbitmq用户名:rabbitmq密码@localhost:5672/rabbitmq设置的虚拟主机名
,
=>celery_result_backend = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq
-
# -*- coding: utf-8 -*-
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key) # your fernet_key, keep it in secured place!
启动airflow
-
初始化数据库
airflow initdb
-
创建用户:脚本创建。
# -*- coding: utf-8 -*-
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin'
user.email = 'your-username'
user.password = 'your-password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
启动airflow [暂时不要启动下面命令,配置文件中已修改完毕,要使用celery。]
airflow webserver -p 8080
后台启动: nohup airflow webserver -p 8080 &
airflow scheduler
后台启动:airflow scheduler
执行测试任务
在$AIRFLOW_HOME目录中创建dags目录。并将测试dag文件tutorial.py放入dags目录中。
# -*- coding: utf-8 -*-
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 05, 10),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval="*/10 * * * *")
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
至此,基本安装完成。下面使用
celery
安装celery
celery是一个分布式消息队列,在airflow中,使用celeryExecutor可以动态的增加worker个数并将任务在远程机器上执行.生产中建议使用celeryExecutor来执行.
安装celery模块
pip install airflow[celery]
安装celery broker
celery需要设置broker和result队列(可以用同样的)来保存消息.celery 支持多种broker:
安装airflow的RabbitMQ模块 :celery可以使用RabbitMQ或者redias等做为broker,甚至可以使用一些Experimental(实验性的)工具(如sqlalchemy支持的数据库),默认使用RabbitMQ.
当前可以使用RabbitMQ
和 Redis
两种方式
- 【官方推荐RabbitMQ】
- 经使用redis在使用过程中会报错json数据无法存入redis等相关问题
主要介绍使用RabbitMQ作为broker
-
安装airflow的celery和rabbitmq组件
pip install airflow[rabbitmq]
-
安装
pip install airflow[rabbitmq]
过程中,会报错没有librabbitmq
库,需要自己编译安装。
librabbitmq
库的获取需要对RabbitMQ-c
编译得到。- 下载
CMake
下载地址CMake 解压, - 进入
CMake
解压后的目录
./bootstrap && make && make install
- 下载rabbitmq-c。下载地址rabbitmq-c;
4 解压rabbitmq-c,进入解压后的目录;
mkdir build && cd build
[# 这一步是在rabbitmq-c的根目录下创建一个build子目录] cmake ..
[# 这一步是让cmake根据../CMakeList.txt,即rabbitmq-c的根目录下的CMakeList.txt创建Makefile文件,Makefile文件会被创建到build目录中]cmake --build .
[# 这一步是真正的build rabbitmq-c库的,注意,不要漏掉结尾的点 '.']- 会在build目录下生产librabbitmq目录,相关的库文件
librabbitmq.so*
在该目录中,将其放入anaconda3
的lib
目录中,即/usr/local/anaconda3/lib/
- 通过编译
rabbitmq-c
获取了librabbitmq.so
库文件就可以安装librabbitmq
模块了
pip install librabbitmq
- 下载
-
安装
RabbitMQ-server
;在安装RabbitMQ-server
之前需要安装erlang
的支持。- 确保
yum
有epel支持。yum install epel*
- 安装
erlang
的基础解决方案- 下载
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
- 安装
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
或者yum install erlang-solutions-1.0-1.noarch.rpm
- 添加源:
rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
- Add the following lines to some file in "/etc/yum.repos.d/":
- 下载
- 确保
-
[erlang-solutions]
name=CentOS $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1
5. 安装erlang `sudo yum install erlang`
- 安装RabbitMQ-server
1. 在[RabbitMQ-server](https://www.rabbitmq.com/install-rpm.html)官网下载安装包。
2. 安装下载的安装包例如:`yum install rabbitmq-server-3.6.1-1.noarch.rpm`
- 配置rabbitmq
- 启动rabbitmq: `rabbitmq-server -detached`
- 开机启动rabbitmq: `chkconfig rabbitmq-server on`
- 配置rabbitmq(REF)创建用户环境等等
rabbitmqctl add_user admin rabbitmq
rabbitmqctl add_vhost airflow-rabbitmq
rabbitmqctl set_user_tags admin airflow-rabbitmq
rabbitmqctl set_permissions -p airflow-rabbitmq admin ".*" ".*" ".*"
rabbitmq-plugins enable rabbitmq_management # no usage
> # 用户user: admin
> # 密码passwd: rabbitmq
> # 虚拟主机: airflow-rabbitmq
- 修改airflow配置文件支持Celery [提前在上文中配置文件中已经配好了]
系统启停命令
进入虚拟环境:
source activate airflow
启动:
nohup airflow webserver -p 1111 &
airflow scheduler -D
airflow worker -D
airflow flower -D
关闭:
ps aux | grep airflow | awk '{print $2}' | xargs sudo kill -9
ps aux | grep celeryd | awk '{print $2}' | xargs sudo kill -9
cd /etl/prod/airflow && rm *.pid -f
退出虚拟环境:
source deactivate airflow
解决的一些问题:
-
创建用户脚本运行后报错(“AttributeError: can't set attribute”)。
- 原因是 sqlalchemy这个Python包版本大于1.2;
pip uninstall sqlalchemy pip install 'sqlalchemy<1.2'
-
webserver界面显示的是UTC时间。如何修改为本地时间。
- 这里修改的是页面时间。1.9版本系统默认的是UTC时间,系统内部暂无不好改,必须要改源码。
vim /usr/local/lib/python2.7/site-packages/airflow/www/templates/admin/master.html //var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000); var UTCseconds = x.getTime();
-
webserver启动后,页面经常时不时的出现
Internal Server Error
500错误 -
同时MySQL数据库大量出现错误
[Note] Aborted connection 683 to db: 'airflow' user: 'admin' host: 'localhost' (Got an error reading communication packets)
- 对于告警日志, 第一条应该是程序还是sqlalchemy中未mysql_close()所以有这个告警, 通过设置wait_timeout为60秒后, 这种未正常关闭的连接sleep后自动关闭;
- set @@global.log_warnings=1;