服務器使用的是centos系統,需要安裝好pip和setuptools,同時注意更新安裝的版本
接下來參考安裝好Airflow
Airflow 1.8 工作流平台搭建 http://blog.csdn.net/kk185800961/article/details/78431484
airflow最簡安裝方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530
以mysql作為數據庫,airflow默認使用sqlite作為數據庫
1.建表
# 創建相關數據庫及賬號 mysql> create database airflow default charset utf8 collate utf8_general_ci; mysql> create user airflow@'localhost' identified by 'airflow'; mysql> grant all on airflow.* to airflow@'localhost'; mysql> flush privileges;
2.安裝airflow,需要環境隔離的時候請使用virtualenv ./env創建隔離環境
sudo pip install apache-airflow
3.使用pip來安裝,安裝的路徑在python路徑下site-packages文件夾,在使用上述命令一遍就能看到
~/anaconda2/lib/python2.7/site-packages/airflow
4.在/etc/proofile中添加,之后source一下
#Airflow export AIRFLOW_HOME=/home/lintong/software/airflow
5.創建數據庫,這一步會創建上面路徑的airflow文件夾,以及文件夾中的一些文件
airflow initdb
查看是否安裝成功
airflow version
5.配置元數據庫地址
/home/lintong/software/airflow vim airflow.cfg
修改下圖中的配置

sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
6.安裝python的mysql驅動
pip install mysql-python
再次初始化數據庫
airflow initdb
7.啟動web界面
airflow webserver -p 8080 http://localhost:8080/admin/
8.在airflow路徑下新建一個dags文件夾,並創建一個DAG python腳本,參考:[AirFlow]AirFlow使用指南三 第一個DAG示例
# -*- coding: utf-8 -*-
import airflow
import os
import time
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['xxxxxxx'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
# -------------------------------------------------------------------------------
# dag
dag = DAG(
'example_print_dag',
default_args=default_args,
description='my first DAG',
schedule_interval=timedelta(days=1))
def each_content(content, path):
return ("echo \"" + content + " " + time.asctime(time.localtime(time.time())) + "\" >> " + path)
# -------------------------------------------------------------------------------
# first operator
#print1_operator = PythonOperator(
# task_id='print1_task',
# python_callable=each_content("1", "/home/lintong/桌面/test.txt"),
# dag=dag)
print1_operator = BashOperator(
task_id='print1_task',
bash_command='echo 1 >> /home/lintong/test.txt',
dag=dag)
# -------------------------------------------------------------------------------
# second operator
print2_operator = BashOperator(
task_id='print2_task',
bash_command='echo 2 >> /home/lintong/test.txt',
dag=dag)
# -------------------------------------------------------------------------------
# third operator
print3_operator = BashOperator(
task_id='print3_task',
bash_command='echo 3 >> /home/lintong/test.txt',
dag=dag)
# -------------------------------------------------------------------------------
# dependencies
#each_content("1", "/home/lintong/桌面/test.txt")
print2_operator.set_upstream(print1_operator)
print3_operator.set_upstream(print1_operator)
#if __name__ == "__main__":
# dag.cli()
9.在web界面中查看是否出現這個DAG

10.出現的時候,DAG的狀態是off,需要將起狀態設置為on,並點擊后面的 綠色三角形 啟動按鈕
11.啟動調度器
airflow scheduler
12.查看文件test.txt,其中會順序出現1 2 3或者1 3 2
安裝完成后使用下面shell腳本來啟動Airflow,端口為8080
#!/bin/bash
#set -x
#set -e
set -u
# 使用./start_airflow.sh "stop_all"或者"start_all"
if [ $1 == "stop_all" ]; then
# 獲取所有進程,取出Airflow,去除grep,截取PID,干掉
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs kill
fi
if [ $1 == "start_all" ]; then
cd /home/lintong/software/airflow/logs
nohup airflow webserver >>webserver.log 2>&1 &
nohup airflow worker >>worker.log 2>&1 &
nohup airflow scheduler >>scheduler.log 2>&1 &
echo "后台啟動Airflow"
fi
添加用戶的python腳本
from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'lintong' user.email = 'xxxxx@gmail.com' user.password = 'XXXXX' session = settings.Session() session.add(user) session.commit() session.close() exit()
如果要安裝1.10版本的,請使用python3.6
如果pip3找不到了
sudo python3 -m pip install --upgrade --force-reinstall pip
如果虛擬環境中不存在pip3
sudo apt-get install python3-venv
安裝Python3.6
sudo add-apt-repository ppa:deadsnakes/ppa sudo apt-get update sudo apt-get install python3.6
安裝pip3.6
wget https://bootstrap.pypa.io/get-pip.py sudo python3.6 get-pip.py
安裝python-dev,不然會報error: command 'x86_64-linux-gnu-gcc' failed with exit status 1
sudo apt-get install python3.6-dev
添加AIRFLOW_HOME
#Airflow export AIRFLOW_HOME=/home/lintong/software/airflow
安裝airflow 1.10.10
sudo pip3.6 install --force-reinstall apache-airflow -i https://pypi.tuna.tsinghua.edu.cn/simple
airflow version,然后airflow_home目錄下會自動出現airflow.cfg文件,修改airflow.cfg
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
安裝mysqlclient,因為python3沒有mysql-python
pip3.6 install mysqlclient
初始化db
airflow initdb
