Airflow安装全过程


Airflow是什么?

Airflow是一个工作流分配管理系统,通过有向非循环图的方式管理任务流程,设置任务依赖关系和时间调度。

Airflow独立于我们要运行的任务,只需要把任务的名字和运行方式提供给Airflow作为一个task就可以。

系统环境

  • 使用的airflow版本为airflow==1.8。注:1.9版本以上已改名为apache-airflow
  • Centos 7.x,使用系统普通用户进行安装以及运行。即hhjie用户。
  • 在普通用户hhjie中安装anaconda3 ,并创建虚拟环境 conda create -n airflow python=2.7 指定Python虚拟环境为2.7。如果只是安装了元数据,没有pip,则先安装pip,在虚拟环境中conda install pip,安装完成后使用 pip -V检查pip环境变量是否是虚拟环境中的。在anaconda中使用国内镜像。在当前目录下,输入:
    conda config --add channels 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/'
    conda config --set show_channel_urls yes

安装与使用。

系统依赖环境安装:

sudo yum install gcc gcc-c++ Fernet MySQL-python sqlite-devel lxml openssl openssl-devel mysql-devel python-devel

安装MySQL数据库支持

pip install airflow[mysql]

MySQL安装【自行安装】

  • 新建用户和数据库
mysql> CREATE DATABASE airflow; 
# 新建用户`admin`,密码为`admim-airflow`, 该用户对数据库`airflow`有完全操作权限
	  
mysql> GRANT all privileges on airflow.* TO 'admin'@'localhost'  IDENTIFIED BY 'admin-airflow'; 
mysql> FLUSH PRIVILEGES; 
  • 缺少mysql_config 执行命令:
    ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config

安装主模块 airflow -->

pip install airflow
安装数据库模块、密码模块
pip install airflow[password]

配置airflow

  • 设置环境变量

设置$AIRFLOW_HOME环境变量。首次执行airflow命令时,会在$AIRFLOW_HOME下面创建airflow的配置文件airflow.cfg。即在当前用户的~/.bash_profile 或者 ~/.bashrc 文件中新增export AIRFLOW_HOME=/data/airflow 然后source 文件生效。

  • 输入 airflow 首次输入命令会在airflow的目录生成文件。

  • 修改配置文件:查看airflow.cfg文件,整个文件分为core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin几个部分。
    对其中一些参数做修改,其它的保持默认值即可:

    1. [core]中executor = SequentialExecutor修改为executor = CeleryExecutor;后面用到celery

    2. [core]中sql_alchemy_conn = sqlite:////etl/prod/airflow/airflow.db修改为sql_alchemy_conn = mysql://user:password@localhost:3306/airflow;使用MySQL数据库

    3. [core]中load_examples = True修改为load_examples = False;不使用示例

    4. [core]中fernet_key = ; 利用脚本生成,粘贴进去。

    5. [webserver]中base_url = 修改为base_url = http://your-ip:8080;修改为服务器ip地址

    6. [webserver]中authenticate = False修改并新增authenticate = True, auth_backend = airflow.contrib.auth.backends.password_auth;web页面设置用户密码登录。

    7. [smtp]中 修改邮件配置

    8. [celery]中修改broker:# broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow 修改为 broker_url = amqp://rabbitmq用户名:rabbitmq密码@localhost:5672/rabbitmq设置的虚拟主机名 => broker_url = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq

    9. [celery]中修改celery_result_backend。# celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow] 修改为 celery_result_backend = amqp://rabbitmq用户名:rabbitmq密码@localhost:5672/rabbitmq设置的虚拟主机名,
      => celery_result_backend = amqp://admin:rabbitmq@localhost:5672/airflow-rabbitmq

# -*- coding: utf-8 -*-
from cryptography.fernet import Fernet

fernet_key= Fernet.generate_key()
print(fernet_key) # your fernet_key, keep it in secured place!

启动airflow

  • 初始化数据库 airflow initdb

  • 创建用户:脚本创建。

# -*- coding: utf-8 -*-
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'admin'
user.email = 'your-username'
user.password = 'your-password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

启动airflow [暂时不要启动下面命令,配置文件中已修改完毕,要使用celery。]

airflow webserver -p 8080
后台启动: nohup airflow webserver -p 8080 &

airflow scheduler
后台启动:airflow scheduler

执行测试任务

在$AIRFLOW_HOME目录中创建dags目录。并将测试dag文件tutorial.py放入dags目录中。

# -*- coding: utf-8 -*-
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 05, 10),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval="*/10 * * * *")

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

至此,基本安装完成。下面使用celery

安装celery

celery是一个分布式消息队列,在airflow中,使用celeryExecutor可以动态的增加worker个数并将任务在远程机器上执行.生产中建议使用celeryExecutor来执行.

安装celery模块

pip install airflow[celery]

安装celery broker

celery需要设置broker和result队列(可以用同样的)来保存消息.celery 支持多种broker:
安装airflow的RabbitMQ模块 :celery可以使用RabbitMQ或者redias等做为broker,甚至可以使用一些Experimental(实验性的)工具(如sqlalchemy支持的数据库),默认使用RabbitMQ.
当前可以使用RabbitMQRedis两种方式

  • 【官方推荐RabbitMQ】
  • 经使用redis在使用过程中会报错json数据无法存入redis等相关问题

主要介绍使用RabbitMQ作为broker

  • 安装airflow的celery和rabbitmq组件

    pip install airflow[rabbitmq]

    • 安装pip install airflow[rabbitmq]过程中,会报错没有librabbitmq库,需要自己编译安装。
      librabbitmq库的获取需要对RabbitMQ-c编译得到。

      1. 下载CMake 下载地址CMake 解压,
      2. 进入CMake解压后的目录
        ./bootstrap && make && make install
      3. 下载rabbitmq-c。下载地址rabbitmq-c;
        4 解压rabbitmq-c,进入解压后的目录;
        mkdir build && cd build[# 这一步是在rabbitmq-c的根目录下创建一个build子目录]
      4. cmake .. [# 这一步是让cmake根据../CMakeList.txt,即rabbitmq-c的根目录下的CMakeList.txt创建Makefile文件,Makefile文件会被创建到build目录中]
      5. cmake --build . [# 这一步是真正的build rabbitmq-c库的,注意,不要漏掉结尾的点 '.']
      6. 会在build目录下生产librabbitmq目录,相关的库文件librabbitmq.so*在该目录中,将其放入anaconda3lib目录中,即/usr/local/anaconda3/lib/
      7. 通过编译rabbitmq-c获取了librabbitmq.so库文件就可以安装librabbitmq模块了
        pip install librabbitmq
    • 安装RabbitMQ-server;在安装RabbitMQ-server之前需要安装erlang的支持。

      1. 确保yum 有epel支持。yum install epel*
      2. 安装erlang的基础解决方案
        1. 下载wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
        2. 安装 rpm -Uvh erlang-solutions-1.0-1.noarch.rpm或者yum install erlang-solutions-1.0-1.noarch.rpm
        3. 添加源:rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
        4. Add the following lines to some file in "/etc/yum.repos.d/":
[erlang-solutions]
name=CentOS $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1
        5. 安装erlang `sudo yum install erlang`
- 安装RabbitMQ-server
    1. 在[RabbitMQ-server](https://www.rabbitmq.com/install-rpm.html)官网下载安装包。
    2. 安装下载的安装包例如:`yum install rabbitmq-server-3.6.1-1.noarch.rpm`

- 配置rabbitmq
    - 启动rabbitmq: `rabbitmq-server -detached`
    - 开机启动rabbitmq: `chkconfig rabbitmq-server on`
    - 配置rabbitmq(REF)创建用户环境等等
rabbitmqctl add_user admin rabbitmq
rabbitmqctl add_vhost airflow-rabbitmq
rabbitmqctl set_user_tags admin airflow-rabbitmq
rabbitmqctl set_permissions -p airflow-rabbitmq admin ".*" ".*" ".*"
rabbitmq-plugins enable rabbitmq_management # no usage

> # 用户user: admin
> # 密码passwd: rabbitmq
> # 虚拟主机: airflow-rabbitmq
- 修改airflow配置文件支持Celery [提前在上文中配置文件中已经配好了]

系统启停命令

进入虚拟环境:
source activate airflow
启动:
nohup airflow webserver -p 1111 &
airflow scheduler -D
airflow worker -D
airflow flower -D

关闭:
ps aux | grep airflow  | awk '{print $2}' | xargs sudo kill -9
ps aux | grep celeryd  | awk '{print $2}' | xargs sudo kill -9

cd /etl/prod/airflow && rm *.pid -f

退出虚拟环境:
source deactivate airflow

解决的一些问题:

  • 创建用户脚本运行后报错(“AttributeError: can't set attribute”)。

    • 原因是 sqlalchemy这个Python包版本大于1.2;
        pip uninstall sqlalchemy
        pip install 'sqlalchemy<1.2'
    
  • webserver界面显示的是UTC时间。如何修改为本地时间。

    • 这里修改的是页面时间。1.9版本系统默认的是UTC时间,系统内部暂无不好改,必须要改源码。
        vim /usr/local/lib/python2.7/site-packages/airflow/www/templates/admin/master.html
        //var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
        var UTCseconds = x.getTime();
    
  • webserver启动后,页面经常时不时的出现Internal Server Error 500错误

  • 同时MySQL数据库大量出现错误[Note] Aborted connection 683 to db: 'airflow' user: 'admin' host: 'localhost' (Got an error reading communication packets)

    • 对于告警日志, 第一条应该是程序还是sqlalchemy中未mysql_close()所以有这个告警, 通过设置wait_timeout为60秒后, 这种未正常关闭的连接sleep后自动关闭;
    • set @@global.log_warnings=1;


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM