airflow部署


官網:

http://airflow.apache.org/installation.html

https://github.com/apache/airflow

原理:

https://www.cnblogs.com/cord/p/9450910.html

安裝:

https://www.cnblogs.com/cord/p/9226608.html

高可用部署等:

 https://www.jianshu.com/p/2ecef979c606

使用方法等:

https://www.jianshu.com/p/cbff05e3f125

日志在:

/tmp/scheduler.log 

 

#用普通帳號部署:

su airflow

啟動命令:

. /home/airflow/venv/bin/activate

 

#單機部署

https://www.jianshu.com/p/9bed1e3ab93b

 1/ python3.6.5環境

https://www.cnblogs.com/hongfeng2019/p/11250989.html

依賴和superset也相同

yum install mysql-devel
pip3 install mysqlclient

 

2/ 安裝mysql

https://www.cnblogs.com/hongfeng2019/p/11279488.html

create database airflowdb
use airflowdb;
GRANT all privileges ON airflowdb.* TO 'airflow'@'localhost' IDENTIFIED BY 'Fengfeng99&';
GRANT all privileges ON airflowdb.* TO 'airflow'@'10.52.%.%' IDENTIFIED BY 'Fengfeng&';

set @@global.explicit_defaults_for_timestamp=on;

防止后面報錯:
"Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"

 

3/ 安裝airflow

mkdir -p /root/airflow

vim /etc/profile

export AIRFLOW_HOME=/root/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes

source /etc/profile

 

pip install apache-airflow[all]

 

4/修改配置

 

vim /root/airflow/airflow.cfg 

sql_alchemy_conn = mysql://airflow:Fengfeng&@10.52.56.119:3306/airflowdb

5/初始化數據庫

airflow initdb

 

#集群模式部署:

https://www.jianshu.com/p/bddacfd66c1f

1/ 安裝celery

pip install celery

2/ 安裝redis (現redis和mysql都改為ucloud的)

普通帳號起參考: https://www.cnblogs.com/hongfeng2019/p/11524173.html

wget http://download.redis.io/releases/redis-4.0.11.tar.gz

$ tar xzf redis-4.0.11.tar.gz
mv redis-4.0.11 /opt/
cd /opt/redis-4.0.11   #公司的在/hongfeng/software
$ make
make執行完后會在src目錄下生成redis-server,redis-cli等六個可執行文件.
redis-server: redis服務器的daemon啟動程序
redis-cli: redis命令操作工具

#改redis-4.0.11目錄下的redis.conf
send -i 's/daemonize no/daemonize yes/g' /opt/redis-4.0.11/redis.conf
sed -i 's/bind 127.0.0.1/#bind 127.0.0.1/g' /opt/redis-4.0.11/redis.conf #允許遠程連接
sed -i 's/protected-mode yes/protected-mode no/g' /opt/redis-4.0.11/redis.conf #允許遠程連接

啟動redis服務:

cd src/

./redis-server ../redis.conf

3/ 配置airflow.cfg

#改LocalExecutor

executor = CeleryExecutor

broker_url = redis://10.52.56.119:6379/0

result_backend = db+mysql://airflow:Fengfeng&@10.52.56.119:3306/airflowdb

4/ 安裝python的redis包

pip install redis

5/ 運行airflow

#啟動airflow webserver測試

airflow webserver -p 8080

后台啟動:

#啟動webserver
#后台運行 airflow webserver -p 8080 -D
airflow webserver -p 8080
#啟動scheduler
#后台運行 airflow scheduler -D
airflow scheduler
#啟動worker
#后台運行 airflow worker -D
#如提示addres already use ,則查看 worker_log_server_port = 8793 是否被占用,如是則修改為 8974 等
#未被占用的端口
airflow worker
#啟動flower -- 可以不啟動
#后台運行 airflow flower -D
airflow flower

如果 airflow flower 運行報錯,請執行 pip install flower 來安裝 flower 。

 

三. 多點的高可用

我們可以借助第三方組件 airflow-scheduler-failover-controller 實現 scheduler 的高可用。

具體步驟如下所示:

下載 failover
1/ git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
使用 pip 進行安裝
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
pip install -e .
初始化 failover
scheduler_failover_controller init
注:初始化時,會向airflow.cfg中追加內容,因此需要先安裝 airflow 並初始化。

2/ 更改 failover 配置
vim /root/airflow.cnf
scheduler_nodes_in_cluster= host1,host2 #可用ip
注:host name 可以通過scheduler_failover_controller get_current_host命令獲得

# Number of times to retry starting up the scheduler before it sends an alert
retry_count_before_alerting = 5 #failover前嘗試在那台起schudle幾次,可改為0

測試:

可把airflow-1關掉,看scheudle是否切換,tailf /root/airflow/failover.log看切換情況.

 

3/ 配置安裝 failover 的機器之間的免密登錄,配置完成后,可以使用如下命令進行驗證:
scheduler_failover_controller test_connection
啟動 failover
scheduler_failover_controller start

/data/venv/bin/scheduler_failover_controller start

 

測試: 

把主node 119停掉

. /data/venv/bin/activate

supervisorctl status

vim /data/venv/etc/supervisord.conf

發現failover起作用了,但schudle進程並沒有拉起來.檢查發現:

要實現真正的高可用還需要:

1/ mysql的MHA

2/ redies的集群

 

#集群要增加節點:

同樣的配置克隆一台,但supervisor里只起failover和worker服務

 

119上沒有woker schudle進程   ps -ef , supervisor看

 

 

重建服務器后配置:

改hostname

改/etc/hosts


cd /root/airflow

vim airflow.cnf
改下面的ip為新ip:
broker_url = redis://10.52.56.119:6379/0
result_backend = db+mysql://airflow:Airflow99&@10.52.56.119:3306/airflowdb
sql_alchemy_conn = mysql://airflow:Airflow99&@10.52.56.119:3306/airflowdb
scheduler_nodes_in_cluster = 10.52.96.246,10.52.143.191

cd /data/venv/etc
vim supervisord.conf
supervisord -c /data/venv/etc/supervisord.conf     #啟動supervisor

cd /root/airflow/dags
git pull

 

. /data/venv/bin/activate
服務命令:
supervisorctl start [service_name] #啟動服務
supervisorctl status [service_name] #停止服務

 

停止supervisor:

venv) [root@airflow-1 ~]# ps -ef |grep supervisord
root 31684 1 0 Oct29 ? 00:07:07 /data/venv/bin/python3 /data/venv/bin/supervisord -c /data/venv/etc/supervisord.conf

pkill supervisord

or kill -9 31684

#supervisor自身的日志在/tmp/supervisord.log

 

#更新環境

scp -r /opt/cloudera  10.52.41.187:/opt 

 

airflow的web改動:

1/ admin--connection; 改成node1.datalake.opay.com

 

 

2/ echo $JAVA_HOME問題:

echo $HADOOP_HOME
/opt/cloudera/parcels/CDH/lib/hadoop

cd /opt/cloudera/parcels/CDH/lib/hadoop/etc/hadoop

在hadoop-env.sh中,再顯示地重新聲明一遍JAVA_HOME

export HADOOP_MAPRED_HOME=$( ([[ ! '/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce' =~ CDH_MR2_HOME ]] && echo /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce ) || echo ${CDH_MR2_HOME:-/usr/lib/hadoop-mapreduce/} )
export YARN_OPTS="-Xmx838860800 -Djava.net.preferIPv4Stack=true $YARN_OPTS"
export HADOOP_CLIENT_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS"
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64

 

3/ 日志同步,從2復制到1 /root/airflow/logs,在master上配, 不然會報[ERROR - Failed to open transport (tries_left=3)]找不到日志的錯

vim  /hongfeng/script/rsynce_airflow_log.sh

rsync -azuq -e ssh root@10.52.51.116:/root/airflow/logs/ /root/airflow/logs/ --exclude 'scheduler'  --exclude 'scheduler_failover' --exclude 'dag_processor_manager'

 

*/2 * * * * /bin/sh /hongfeng/script/rsynce_airflow_log.sh >/dev/null 2>&1

 

4/ ERROR - cannot import name 'constants' 的問題

vi /data/venv/lib/python3.6/site-packages/airflow/hooks/hive_hooks.py
line:802

#from pyhive.hive import connect
from impala.dbapi import connect
# return connect(
# host=db.host,
# port=db.port,
# auth_mechanism='PLAIN',
#auth=auth_mechanism,
# kerberos_service_name=kerberos_service_name,
# username=db.login or username,
# database=schema or db.schema or 'default')

return connect(
host=db.host,
port=db.port,
auth_mechanism='PLAIN',
user=db.login,
password=db.password)

檢查thrift-sasl版本
pip list | grep thrift-sasl
thrift-sasl 0.3.0
pip install thrift-sasl==0.2.1

 

5/ airflow-web服務起不來

看/root/airflow/webserver.log日志,是否有登陸滾動信息

schedule的假死問題可以用strace追蹤

strace -p

nohup strace -o output.txt -T -tt -e trace=all -p 8761 &
nohup strace -o /hongfeng/output.txt -p 32536 &

 

6/ 當定義的dag文件過多的時候,airflow的scheduler節點運行效率緩慢

[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默認是2這里改為100
max_threads = 20

查看:

 ps -ef |grep schedu

7/ 任務失敗發郵件時同時發送微信報警

 vi /data/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py

email_alert方法

        # wx
        import sys
        sys.path.append("/root/airflow/dags")
        from plugins.comwx import ComwxApi
        comwx = ComwxApi('wwd26d45f97ea74ad2', 'BLE_v25zCmnZaFUgum93j3zVBDK-DjtRkLisI_Wns4g', '1000011')
        comwx.postAppMessage(subject, '271')

8/ 隨着任務增多,airflow調度hive的壓力增大

 

 

解決:配置ulb轉發到集群的多hive節點,分擔壓力。 ulb配置見:https://www.cnblogs.com/hongfeng2019/p/11934689.html

10.52.151.166     10000

 

 

 

 

airlow2上:

scp airflow.cfg  10.52.51.8:/root/airflow


cd /data/venv/etc
vim supervisord.conf
刪除
[program:airflow_flower]
[program:airflow_web]

supervisord -c /data/venv/etc/supervisord.conf 

systemctl service mysqld

 

airflow日常維護:

同事震乾贊助

大數據Airflow集群目前部署了兩台server:

10.52.56.119

10.52.12.116

服務部署情況:

10.52.56.119

  • mysql

  • redis

  • airflow_failover

  • airflow_flower

  • airflow_scheduler

  • airflow_web

  • airflow_worker

10.52.12.116

  • airflow_failover

  • airflow_worker

 

訪問服務:

ip:8080 airflow_web 服務

ip:5555 airflow_flower 服務

 

airflow相關服務由supvisor管理,日常維護命令如下

先進入python虛擬環境

. /data/venv/bin/activate

 退出:deactivate

服務命令:

supervisorctl start [service_name] #啟動服務

supervisorctl stop [service_name] #停止服務

supervisorctl stop all

supervisorctl restart [service_name] #重啟服務

 

supervisor配置文件 /data/venv/etc/supervisord.conf

 

[program:airflow_web]
command=airflow webserver -p 8080
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/webserver.log

[program:airflow_scheduler]
command=airflow scheduler
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/scheduler.log

[program:airflow_worker]
command=airflow worker
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
environment=C_FORCE_ROOT=true
stdout_logfile=/root/airflow/worker.log

[program:airflow_flower]
command=airflow flower --basic_auth=admin:opay321
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/flower.log

[program:airflow_failover]
command=scheduler_failover_controller start
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/failover.log

 
View Code

 

git同步腳本:

vim /root/deploy_git_airflow.sh
ansible airflow -m shell -a 'cd /root/airflow/dags && git pull'

 

airflow連接配置:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM