Airflow是什么?
Airflow是一個工作流分配管理系統,通過有向非循環圖的方式管理任務流程,設置任務依賴關系和時間調度。
Airflow獨立於我們要運行的任務,只需要把任務的名字和運行方式提供給Airflow作為一個task就可以。
系統環境
- 使用的airflow版本為
airflow==1.8。注:1.9版本以上已改名為apache-airflow Centos 7.x,使用系統普通用戶進行安裝以及運行。即hhjie用戶。- 在普通用戶
hhjie中安裝anaconda3,並創建虛擬環境conda create -n airflow python=2.7指定Python虛擬環境為2.7。如果只是安裝了元數據,沒有pip,則先安裝pip,在虛擬環境中conda install pip,安裝完成后使用pip -V檢查pip環境變量是否是虛擬環境中的。在anaconda中使用國內鏡像。在當前目錄下,輸入:
conda config --add channels 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/'
conda config --set show_channel_urls yes
安裝與使用。
系統依賴環境安裝:
sudo yum install gcc gcc-c++ Fernet MySQL-python sqlite-devel lxml openssl openssl-devel mysql-devel python-devel
安裝MySQL數據庫支持
pip install airflow[mysql]
MySQL安裝【自行安裝】
- 新建用戶和數據庫
mysql> CREATE DATABASE airflow;
# 新建用戶`admin`,密碼為`admim-airflow`, 該用戶對數據庫`airflow`有完全操作權限
mysql> GRANT all privileges on airflow.* TO 'admin'@'localhost' IDENTIFIED BY 'admin-airflow';
mysql> FLUSH PRIVILEGES;
- 缺少mysql_config 執行命令:
ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config
安裝主模塊 airflow -->
pip install airflow
安裝數據庫模塊、密碼模塊
pip install airflow[password]
配置airflow
- 設置環境變量
設置$AIRFLOW_HOME環境變量。首次執行airflow命令時,會在$AIRFLOW_HOME下面創建airflow的配置文件airflow.cfg。即在當前用戶的~/.bash_profile 或者 ~/.bashrc 文件中新增export AIRFLOW_HOME=/data/airflow 然后source 文件生效。
-
輸入
airflow首次輸入命令會在airflow的目錄生成文件。 -
修改配置文件:查看airflow.cfg文件,整個文件分為core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin幾個部分。
對其中一些參數做修改,其它的保持默認值即可:-
[core]中
executor = SequentialExecutor修改為executor = CeleryExecutor;后面用到celery -
[core]中
sql_alchemy_conn = sqlite:////etl/prod/airflow/airflow.db修改為sql_alchemy_conn = mysql://user:password@localhost:3306/airflow;使用MySQL數據庫 -
[core]中
load_examples = True修改為load_examples = False;不使用示例 -
[core]中fernet_key = ; 利用腳本生成,粘貼進去。
-
[webserver]中
base_url =修改為base_url = http://your-ip:8080;修改為服務器ip地址 -
[webserver]中
authenticate = False修改並新增authenticate = True,auth_backend = airflow.contrib.auth.backends.password_auth;web頁面設置用戶密碼登錄。 -
[smtp]中 修改郵件配置
-
[celery]中修改broker:
# broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow修改為broker_url = amqp://rabbitmq用戶名:rabbitmq密碼@localhost:5672/rabbitmq設置的虛擬主機名=>broker_url = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq -
[celery]中修改celery_result_backend。
# celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow]修改為celery_result_backend = amqp://rabbitmq用戶名:rabbitmq密碼@localhost:5672/rabbitmq設置的虛擬主機名,
=>celery_result_backend = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq
-
# -*- coding: utf-8 -*-
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key) # your fernet_key, keep it in secured place!
啟動airflow
-
初始化數據庫
airflow initdb -
創建用戶:腳本創建。
# -*- coding: utf-8 -*-
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin'
user.email = 'your-username'
user.password = 'your-password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
啟動airflow [暫時不要啟動下面命令,配置文件中已修改完畢,要使用celery。]
airflow webserver -p 8080
后台啟動: nohup airflow webserver -p 8080 &
airflow scheduler
后台啟動:airflow scheduler
執行測試任務
在$AIRFLOW_HOME目錄中創建dags目錄。並將測試dag文件tutorial.py放入dags目錄中。
# -*- coding: utf-8 -*-
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 05, 10),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval="*/10 * * * *")
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
至此,基本安裝完成。下面使用
celery
安裝celery
celery是一個分布式消息隊列,在airflow中,使用celeryExecutor可以動態的增加worker個數並將任務在遠程機器上執行.生產中建議使用celeryExecutor來執行.
安裝celery模塊
pip install airflow[celery]
安裝celery broker
celery需要設置broker和result隊列(可以用同樣的)來保存消息.celery 支持多種broker:
安裝airflow的RabbitMQ模塊 :celery可以使用RabbitMQ或者redias等做為broker,甚至可以使用一些Experimental(實驗性的)工具(如sqlalchemy支持的數據庫),默認使用RabbitMQ.
當前可以使用RabbitMQ 和 Redis兩種方式
- 【官方推薦RabbitMQ】
- 經使用redis在使用過程中會報錯json數據無法存入redis等相關問題
主要介紹使用RabbitMQ作為broker
-
安裝airflow的celery和rabbitmq組件
pip install airflow[rabbitmq]-
安裝
pip install airflow[rabbitmq]過程中,會報錯沒有librabbitmq庫,需要自己編譯安裝。
librabbitmq庫的獲取需要對RabbitMQ-c編譯得到。- 下載
CMake下載地址CMake 解壓, - 進入
CMake解壓后的目錄
./bootstrap && make && make install - 下載rabbitmq-c。下載地址rabbitmq-c;
4 解壓rabbitmq-c,進入解壓后的目錄;
mkdir build && cd build[# 這一步是在rabbitmq-c的根目錄下創建一個build子目錄] cmake ..[# 這一步是讓cmake根據../CMakeList.txt,即rabbitmq-c的根目錄下的CMakeList.txt創建Makefile文件,Makefile文件會被創建到build目錄中]cmake --build .[# 這一步是真正的build rabbitmq-c庫的,注意,不要漏掉結尾的點 '.']- 會在build目錄下生產librabbitmq目錄,相關的庫文件
librabbitmq.so*在該目錄中,將其放入anaconda3的lib目錄中,即/usr/local/anaconda3/lib/ - 通過編譯
rabbitmq-c獲取了librabbitmq.so庫文件就可以安裝librabbitmq模塊了
pip install librabbitmq
- 下載
-
安裝
RabbitMQ-server;在安裝RabbitMQ-server之前需要安裝erlang的支持。- 確保
yum有epel支持。yum install epel* - 安裝
erlang的基礎解決方案- 下載
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm - 安裝
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm或者yum install erlang-solutions-1.0-1.noarch.rpm - 添加源:
rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc - Add the following lines to some file in "/etc/yum.repos.d/":
- 下載
- 確保
-
[erlang-solutions]
name=CentOS $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1
5. 安裝erlang `sudo yum install erlang`
- 安裝RabbitMQ-server
1. 在[RabbitMQ-server](https://www.rabbitmq.com/install-rpm.html)官網下載安裝包。
2. 安裝下載的安裝包例如:`yum install rabbitmq-server-3.6.1-1.noarch.rpm`
- 配置rabbitmq
- 啟動rabbitmq: `rabbitmq-server -detached`
- 開機啟動rabbitmq: `chkconfig rabbitmq-server on`
- 配置rabbitmq(REF)創建用戶環境等等
rabbitmqctl add_user admin rabbitmq
rabbitmqctl add_vhost airflow-rabbitmq
rabbitmqctl set_user_tags admin airflow-rabbitmq
rabbitmqctl set_permissions -p airflow-rabbitmq admin ".*" ".*" ".*"
rabbitmq-plugins enable rabbitmq_management # no usage
> # 用戶user: admin
> # 密碼passwd: rabbitmq
> # 虛擬主機: airflow-rabbitmq
- 修改airflow配置文件支持Celery [提前在上文中配置文件中已經配好了]
系統啟停命令
進入虛擬環境:
source activate airflow
啟動:
nohup airflow webserver -p 1111 &
airflow scheduler -D
airflow worker -D
airflow flower -D
關閉:
ps aux | grep airflow | awk '{print $2}' | xargs sudo kill -9
ps aux | grep celeryd | awk '{print $2}' | xargs sudo kill -9
cd /etl/prod/airflow && rm *.pid -f
退出虛擬環境:
source deactivate airflow
解決的一些問題:
-
創建用戶腳本運行后報錯(“AttributeError: can't set attribute”)。
- 原因是 sqlalchemy這個Python包版本大於1.2;
pip uninstall sqlalchemy pip install 'sqlalchemy<1.2' -
webserver界面顯示的是UTC時間。如何修改為本地時間。
- 這里修改的是頁面時間。1.9版本系統默認的是UTC時間,系統內部暫無不好改,必須要改源碼。
vim /usr/local/lib/python2.7/site-packages/airflow/www/templates/admin/master.html //var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000); var UTCseconds = x.getTime(); -
webserver啟動后,頁面經常時不時的出現
Internal Server Error500錯誤 -
同時MySQL數據庫大量出現錯誤
[Note] Aborted connection 683 to db: 'airflow' user: 'admin' host: 'localhost' (Got an error reading communication packets)- 對於告警日志, 第一條應該是程序還是sqlalchemy中未mysql_close()所以有這個告警, 通過設置wait_timeout為60秒后, 這種未正常關閉的連接sleep后自動關閉;
- set @@global.log_warnings=1;
