1.1 安裝環境
1.2 創建用戶
2.安裝airflow
2.1 安裝python
2.2 安裝pip
2.3 安裝數據庫
2.4 安裝airflow
2.4.1 安裝主模塊
2.4.2 安裝數據庫模塊、密碼模塊
2.5 配置airflown
2.5.1 設置環境變量
2.5.2 修改配置文件
3. 啟動airflow
3.1 初始化數據庫
3.2 創建用戶
3.3 啟動airflow
4.執行任務
5.安裝celery
5.1 安裝celery模塊
5.2 安裝celery broker
5.2.1 使用RabbitMQ作為broker
5.2.2 使用Redis做為broker
5.3 修改airflow配置文件啟用celery
5.4 測試celery
5.5 部署多個worker
6. 問題
官方文檔文檔:
http://airflow.incubator.apache.org/project.html
1.環境准備
1.1 安裝環境
- centos 6.7 (docker)
- python 2.7.13
docker run --name airflow -h airflow -dti --net hadoopnet --ip=172.18.0.20 -p 10131:22 -v /dfs/centos/airflow/home:/home -v /dfs/centos/airflow/opt:/opt yangxw/centos:6.7
1.2 創建用戶
[root@airflow ~]# groupadd airflow
[root@airflow ~]# useradd airflow -g airflow
2.安裝airflow
2.1 安裝python
官網只有source包,所以必須編譯安裝。
參考:編譯安裝python2.7.13
由於編譯python需要升級gcc,進而需要編譯gcc,太復雜,因此直接下載python的集成環境Anaconda即可.
wegt https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/
2.2 安裝pip
anacconda中集成了pip,直接使用即可.
2.3 安裝數據庫
airflow支持mysql postgrey oracle等。這里postgrey.使用yum install postgrey安裝即可.
2.4 安裝airflow
airflow組件可以模塊化安裝,用到哪個組件安裝哪個組件,如:

2.4.1 安裝主模塊
安裝主模塊
[airflow@airflow ~]$ pip install airflow
2.4.2 安裝數據庫模塊、密碼模塊
[airflow@airflow ~]$ pip install "airflow[postgres,password]"
2.5 配置airflown
2.5.1 設置環境變量
先設置$AIRFLOW_HOME環境變量。首次執行airflow命令時,會在$AIRFLOW_HOME下面創建airflow的配置文件airflow.cfg。
[airflow@airflow ~]$ vi .bashrc
export AIRFLOW_HOME=/home/airflow/airflow01
[airflow@airflow ~]$ airflow
[2017-05-08 02:00:04,677] {__init__.py:57} INFO - Using executor SequentialExecutor
usage: airflow [-h]
{resetdb,render,variables,connections,pause,task_failed_deps,version,trigger_dag,initdb,test,unpause,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,upgradedb}
…
airflow: error: too few arguments
[airflow@airflow ~]$ ll airflow01/
total 16
-rw-rw-r-- 1 airflow airflow 11418 May 8 02:00 airflow.cfg
-rw-rw-r-- 1 airflow airflow 1549 May 8 02:00 unittests.cfg
2.5.2 修改配置文件
查看airflow.cfg文件,整個文件分為core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin幾個部分。
對其中一些參數做修改,其它的保持默認值即可:
[core]
airflow_home = /home/airflow/airflow01
dags_folder = /home/airflow/airflow01/dags #dag python文件目錄
executor = LocalExecutor #先使用local模式
base_log_folder = /home/airflow/airflow01/logs #主日志目錄
sql_alchemy_conn = postgresql+psycopg2://yangxiaowen:yangxiaowen@10.38.1.78:5432/yangxiaowen
load_examples = True
default_impersonation = xiaowen.yang
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth #1.8.1版本中cfg文件沒有寫這個參數,一定要加上,不然會報"airflow.exceptions.AirflowException: Failed to import authentication backend"錯誤
filter_by_owner = true
web_server_host = XXX.XXX.XXX.XXX #web server 機器IP
base_url = http://XXX.XXX.XXX.XXX:8080 #web server 機器IP:PORT
[smtp]
smtp_host = smtp.exmail.qq.com
smtp_user = bd-no-reply@bqjr.cn
smtp_password = BQJRbd@2016
smtp_mail_from = bd-no-reply@bqjr.cn
3. 啟動airflow
3.1 初始化數據庫
[airflow@airflow ~]$ airflow initdb
3.2 創建用戶
$ python
Python 2.7.9 (default, Feb 10 2015, 03:28:08)
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
3.3 啟動airflow
[airflow@airflow ~]$ airflow webserver -p 8080
[airflow@airflow ~]$ airflow scheduler
如果不出錯就啟動成功了.
可以在頁面上查看airflow的頁面.

4.執行任務
airflow中的任務都是python程序.下面創建一個簡單的python程序.
在$AIRFLOW_HOME下創建dags\logs目錄.
vi testBashOperator.py
#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'yangxw',
'depends_on_past': False,
'start_date': datetime(2017, 5, 9),
'email': ['xiaowen.yang@bqjr.cn'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('testBashOperator', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
airflow webserver --debug=True
執行 python testBashOperator.py編譯該文件,然后執行 airflow run testBashOperator print_date 2017-05-09 執行文件,在頁面上能看到dag信息.

5.安裝celery
celery是一個分布式消息隊列,在airflow中,使用celeryExecutor可以動態的增加worker個數並將任務在遠程機器上執行.生產中建議使用celeryExecutor來執行.
5.1 安裝celery模塊
pip install airflow[celery]
5.2 安裝celery broker
celery需要設置broker和result隊列(可以用同樣的)來保存消息.celery 支持多種broker:

5.2.1 使用RabbitMQ作為broker
- 安裝airflow的RabbitMQ模塊
celery可以使用RabbitMQ或者redias等做為broker,甚至可以使用一些Experimental(實驗性的)工具(如sqlalchemy支持的數據庫),默認使用RabbitMQ.
pip install airflow[rabbitmq] - 安裝RabbitMQ-server
yum install rabbitmq-server
(有160多個依賴包!)
然后啟動service rabbitmq-server start - 配置 rabbitmq
http://blog.csdn.net/qazplm12_3/article/details/53065654
rabbitmqctl add_user ct 152108
rabbitmqctl add_vhost ct_airflow
rabbitmqctl set_user_tags ct airflow
rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*"
5.2.2 使用Redis做為broker
- 安裝celery redis模塊
pip install -U "celery[redis]" - 安裝redis數據庫
yum install redis - 啟動redis
service redis start
4.修改airflow配置文件
broker_url = redis://localhost:6379/0
celery_result_backend = redis://localhost:6379/0
5.3 修改airflow配置文件啟用celery
修改airflow.cfg文件:
[core]
executor = CeleryExecutor
[celery]
broker_url = amqp://ct:152108@localhost:5672/ct_airflow
celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow
5.4 測試celery
[airflow@airflow ~]$ airflow webserver -p 8100
[airflow@airflow ~]$ airflow scheduler
[airflow@airflow ~]$ airflow worker #啟動celeryexcutor
可以看到CeleryExecutor啟動情況.再執行airflow run testBashOperator print_date 2017-05-09,看看CeleryExecutor運行情況.
5.5 部署多個worker
在需要運行作業的機器上的安裝airflow airflow[celery] celery[redis] 模塊后,啟動airflow worker即可.這樣作業就能運行在多個節點上.
6. 問題
在docker中遇到以下問題,換成實體機后解決
[2017-05-10 09:14:59,777: ERROR/Worker-1] Command 'airflow run testFile echoDate 2017-05-10T00:00:00 --local -sd DAGS_FOLDER/testFile.py' returned non-zero exit status 1
[2017-05-10 09:14:59,783: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c5d5ea39-0141-46bb-b33a-06a924c07508] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/opt/anaconda2/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 59, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed
參考:
http://airflow.incubator.apache.org
https://my.oschina.net/u/2297683/blog/751880
http://blog.csdn.net/qazplm12_3/article/details/53065654
http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
http://www.rabbitmq.com/install-rpm.html
