Airflow安裝全過程


Airflow是什么?

Airflow是一個工作流分配管理系統,通過有向非循環圖的方式管理任務流程,設置任務依賴關系和時間調度。

Airflow獨立於我們要運行的任務,只需要把任務的名字和運行方式提供給Airflow作為一個task就可以。

系統環境

  • 使用的airflow版本為airflow==1.8。注:1.9版本以上已改名為apache-airflow
  • Centos 7.x,使用系統普通用戶進行安裝以及運行。即hhjie用戶。
  • 在普通用戶hhjie中安裝anaconda3 ,並創建虛擬環境 conda create -n airflow python=2.7 指定Python虛擬環境為2.7。如果只是安裝了元數據,沒有pip,則先安裝pip,在虛擬環境中conda install pip,安裝完成后使用 pip -V檢查pip環境變量是否是虛擬環境中的。在anaconda中使用國內鏡像。在當前目錄下,輸入:
    conda config --add channels 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/'
    conda config --set show_channel_urls yes

安裝與使用。

系統依賴環境安裝:

sudo yum install gcc gcc-c++ Fernet MySQL-python sqlite-devel lxml openssl openssl-devel mysql-devel python-devel

安裝MySQL數據庫支持

pip install airflow[mysql]

MySQL安裝【自行安裝】

  • 新建用戶和數據庫
mysql> CREATE DATABASE airflow; 
# 新建用戶`admin`,密碼為`admim-airflow`, 該用戶對數據庫`airflow`有完全操作權限
	  
mysql> GRANT all privileges on airflow.* TO 'admin'@'localhost'  IDENTIFIED BY 'admin-airflow'; 
mysql> FLUSH PRIVILEGES; 
  • 缺少mysql_config 執行命令:
    ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config

安裝主模塊 airflow -->

pip install airflow
安裝數據庫模塊、密碼模塊
pip install airflow[password]

配置airflow

  • 設置環境變量

設置$AIRFLOW_HOME環境變量。首次執行airflow命令時,會在$AIRFLOW_HOME下面創建airflow的配置文件airflow.cfg。即在當前用戶的~/.bash_profile 或者 ~/.bashrc 文件中新增export AIRFLOW_HOME=/data/airflow 然后source 文件生效。

  • 輸入 airflow 首次輸入命令會在airflow的目錄生成文件。

  • 修改配置文件:查看airflow.cfg文件,整個文件分為core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin幾個部分。
    對其中一些參數做修改,其它的保持默認值即可:

    1. [core]中executor = SequentialExecutor修改為executor = CeleryExecutor;后面用到celery

    2. [core]中sql_alchemy_conn = sqlite:////etl/prod/airflow/airflow.db修改為sql_alchemy_conn = mysql://user:password@localhost:3306/airflow;使用MySQL數據庫

    3. [core]中load_examples = True修改為load_examples = False;不使用示例

    4. [core]中fernet_key = ; 利用腳本生成,粘貼進去。

    5. [webserver]中base_url = 修改為base_url = http://your-ip:8080;修改為服務器ip地址

    6. [webserver]中authenticate = False修改並新增authenticate = True, auth_backend = airflow.contrib.auth.backends.password_auth;web頁面設置用戶密碼登錄。

    7. [smtp]中 修改郵件配置

    8. [celery]中修改broker:# broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow 修改為 broker_url = amqp://rabbitmq用戶名:rabbitmq密碼@localhost:5672/rabbitmq設置的虛擬主機名 => broker_url = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq

    9. [celery]中修改celery_result_backend。# celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow] 修改為 celery_result_backend = amqp://rabbitmq用戶名:rabbitmq密碼@localhost:5672/rabbitmq設置的虛擬主機名,
      => celery_result_backend = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq

# -*- coding: utf-8 -*-
from cryptography.fernet import Fernet

fernet_key= Fernet.generate_key()
print(fernet_key) # your fernet_key, keep it in secured place!

啟動airflow

  • 初始化數據庫 airflow initdb

  • 創建用戶:腳本創建。

# -*- coding: utf-8 -*-
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'admin'
user.email = 'your-username'
user.password = 'your-password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

啟動airflow [暫時不要啟動下面命令,配置文件中已修改完畢,要使用celery。]

airflow webserver -p 8080
后台啟動: nohup airflow webserver -p 8080 &

airflow scheduler
后台啟動:airflow scheduler

執行測試任務

在$AIRFLOW_HOME目錄中創建dags目錄。並將測試dag文件tutorial.py放入dags目錄中。

# -*- coding: utf-8 -*-
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 05, 10),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval="*/10 * * * *")

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

至此,基本安裝完成。下面使用celery

安裝celery

celery是一個分布式消息隊列,在airflow中,使用celeryExecutor可以動態的增加worker個數並將任務在遠程機器上執行.生產中建議使用celeryExecutor來執行.

安裝celery模塊

pip install airflow[celery]

安裝celery broker

celery需要設置broker和result隊列(可以用同樣的)來保存消息.celery 支持多種broker:
安裝airflow的RabbitMQ模塊 :celery可以使用RabbitMQ或者redias等做為broker,甚至可以使用一些Experimental(實驗性的)工具(如sqlalchemy支持的數據庫),默認使用RabbitMQ.
當前可以使用RabbitMQRedis兩種方式

  • 【官方推薦RabbitMQ】
  • 經使用redis在使用過程中會報錯json數據無法存入redis等相關問題

主要介紹使用RabbitMQ作為broker

  • 安裝airflow的celery和rabbitmq組件

    pip install airflow[rabbitmq]

    • 安裝pip install airflow[rabbitmq]過程中,會報錯沒有librabbitmq庫,需要自己編譯安裝。
      librabbitmq庫的獲取需要對RabbitMQ-c編譯得到。

      1. 下載CMake 下載地址CMake 解壓,
      2. 進入CMake解壓后的目錄
        ./bootstrap && make && make install
      3. 下載rabbitmq-c。下載地址rabbitmq-c;
        4 解壓rabbitmq-c,進入解壓后的目錄;
        mkdir build && cd build[# 這一步是在rabbitmq-c的根目錄下創建一個build子目錄]
      4. cmake .. [# 這一步是讓cmake根據../CMakeList.txt,即rabbitmq-c的根目錄下的CMakeList.txt創建Makefile文件,Makefile文件會被創建到build目錄中]
      5. cmake --build . [# 這一步是真正的build rabbitmq-c庫的,注意,不要漏掉結尾的點 '.']
      6. 會在build目錄下生產librabbitmq目錄,相關的庫文件librabbitmq.so*在該目錄中,將其放入anaconda3lib目錄中,即/usr/local/anaconda3/lib/
      7. 通過編譯rabbitmq-c獲取了librabbitmq.so庫文件就可以安裝librabbitmq模塊了
        pip install librabbitmq
    • 安裝RabbitMQ-server;在安裝RabbitMQ-server之前需要安裝erlang的支持。

      1. 確保yum 有epel支持。yum install epel*
      2. 安裝erlang的基礎解決方案
        1. 下載wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
        2. 安裝 rpm -Uvh erlang-solutions-1.0-1.noarch.rpm或者yum install erlang-solutions-1.0-1.noarch.rpm
        3. 添加源:rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
        4. Add the following lines to some file in "/etc/yum.repos.d/":
[erlang-solutions]
name=CentOS $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1
        5. 安裝erlang `sudo yum install erlang`
- 安裝RabbitMQ-server
    1. 在[RabbitMQ-server](https://www.rabbitmq.com/install-rpm.html)官網下載安裝包。
    2. 安裝下載的安裝包例如:`yum install rabbitmq-server-3.6.1-1.noarch.rpm`

- 配置rabbitmq
    - 啟動rabbitmq: `rabbitmq-server -detached`
    - 開機啟動rabbitmq: `chkconfig rabbitmq-server on`
    - 配置rabbitmq(REF)創建用戶環境等等
rabbitmqctl add_user admin rabbitmq
rabbitmqctl add_vhost airflow-rabbitmq
rabbitmqctl set_user_tags admin airflow-rabbitmq
rabbitmqctl set_permissions -p airflow-rabbitmq admin ".*" ".*" ".*"
rabbitmq-plugins enable rabbitmq_management # no usage

> # 用戶user: admin
> # 密碼passwd: rabbitmq
> # 虛擬主機: airflow-rabbitmq
- 修改airflow配置文件支持Celery [提前在上文中配置文件中已經配好了]

系統啟停命令

進入虛擬環境:
source activate airflow
啟動:
nohup airflow webserver -p 1111 &
airflow scheduler -D
airflow worker -D
airflow flower -D

關閉:
ps aux | grep airflow  | awk '{print $2}' | xargs sudo kill -9
ps aux | grep celeryd  | awk '{print $2}' | xargs sudo kill -9

cd /etl/prod/airflow && rm *.pid -f

退出虛擬環境:
source deactivate airflow

解決的一些問題:

  • 創建用戶腳本運行后報錯(“AttributeError: can't set attribute”)。

    • 原因是 sqlalchemy這個Python包版本大於1.2;
        pip uninstall sqlalchemy
        pip install 'sqlalchemy<1.2'
    
  • webserver界面顯示的是UTC時間。如何修改為本地時間。

    • 這里修改的是頁面時間。1.9版本系統默認的是UTC時間,系統內部暫無不好改,必須要改源碼。
        vim /usr/local/lib/python2.7/site-packages/airflow/www/templates/admin/master.html
        //var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
        var UTCseconds = x.getTime();
    
  • webserver啟動后,頁面經常時不時的出現Internal Server Error 500錯誤

  • 同時MySQL數據庫大量出現錯誤[Note] Aborted connection 683 to db: 'airflow' user: 'admin' host: 'localhost' (Got an error reading communication packets)

    • 對於告警日志, 第一條應該是程序還是sqlalchemy中未mysql_close()所以有這個告警, 通過設置wait_timeout為60秒后, 這種未正常關閉的連接sleep后自動關閉;
    • set @@global.log_warnings=1;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM