airflow 筆記


 

首先是一個比較好的英文網站,可能要fq:http://site.clairvoyantsoft.com/installing-and-configuring-apache-airflow/

========================================================

py3venv

在 mkdir ~/airflow/py3venv

執行:python3 -m venv ~/airflow/py3venv

進入python3 環境:source /home/airflow/py3venv/bin/activate

以后的所有操作都進到 py3venv 的python3 環境操作
=========================================================

看看是否有gcc,沒有的話需要進行安裝:

sudo yum install gcc  (后續安裝airflow如果不成功,可以再次執行,它會更新包)

=========================================================

 

(py3venv) [airflow@iZ114t000jwZ airflow]$ airflow initdb
[2018-08-22 14:30:30,472] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-08-22 14:30:30,473] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-08-22 14:30:30,474] {__init__.py:45} INFO - Using executor CeleryExecutor
DB: mysql://prodroot:***@rds8r7t55g74gz9a61d0.mysql.rds.aliyuncs.com/airflow?charset=utf8
[2018-08-22 14:30:30,724] {db.py:312} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
[2018-08-22 14:30:31,107] {models.py:189} INFO - Filling up the DagBag from /home/airflow/proj/airflow/test/dags

=========================================================

啟動服務腳本

nohup airflow webserver >>/home/airflow/airflow/logs/webserver.log &
nohup airflow worker >> /home/airflow/airflow/logs/worker.log &
nohup airflow scheduler >> /home/airflow/airflow/logs/scheduler.log &
nohup airflow flower >> /home/airflow/airflow/logs/flower.log &

 

/home/airflow/proj/airflow/test/airflow-webserver.pid

里面的數字就是webserver的process id,  kill -9 {id} 就能停掉 webserver 服務了,重啟用上面第一個

 

重啟服務器,airflow會以 /home/airflow/airflow/airflow.cfg 為准


=========================================================

修改數據庫配置

可以在~/airflow 目錄下,修改airflow.cfg配置文件,數據庫連接改為本地mysql的URL

先在mysql數據庫里新建airflow數據庫

然后修改airflow.cfg配置文件:

sql_alchemy_conn = sqlite:////root/airflow/airflow.db 改為:

sql_alchemy_conn = mysql://root:root@192.168.202.128:3306/airflow

接着重新初始化數據庫:

airflow initdb

如果成功,則在airflow數據庫表中可以看到很多表

 

如出現以下報錯,則執行:yum install MySQL-python (注意此命令為centos7系統適合)

 

 

return __import__('MySQLdb') ImportError: No modulenamedMySQLdb

=======================================================

airflow1.10 中, airflow initdb 報錯:

在Flask中連接MySQL時出現ModuleNotFoundError: No module named 'MySQLdb'錯誤,

只要在配置SQLALCHEMY_DATABASE_URI時,加上一個pymysql就可以了:
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:dzd123@localhost/你的數據庫名'
,當然,前提是要已經安裝了pymysql,如果沒有裝pymysql,是會報ModuleNotFoundError: No module named 'pymysql'錯誤的,

安裝命令也很簡單:pip install pymysql

=========================================================

 問題1:解決MySQLdb ImportError: libmysqlclient.so.18錯誤

主要解決方法就是創建一個 18的軟連接

https://blog.csdn.net/haoyuxuan/article/details/45334087

 

=========================================================

 問題2:File "/home/airflow/py3env/lib/python3.6/site-packages/airflow/models.py", line 639, in set_extra

先安裝包:pip install apache-airflow[crypto]

執行:python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

將得到的字符串替換 airflow.cfg 中fernet_key 的值

這段字符串加密算法用到

=========================================================

# 在 python 中執行添加賬戶:

airflow 1.9 默認安裝的 SQLAlchemy version 1.2.2 ,下面這段python會報錯:

File "/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/hybrid.py", line 873, in set
raise AttributeError("can't set attribute")
AttributeError: can't set attribute

解決方法,執行:pip install 'sqlalchemy<1.2'

這樣才能得到加密的用戶記錄:

 

 

 


import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'nick111@163.com'
user.password = 'airflow*****'
session = settings.Session()
session.add(user)
session.commit()
session.close()

=======================================================

redis 安裝參考:https://blog.csdn.net/jiangdong2007/article/details/80890236

airflow 1.9.0要使用celery4.x, redis版本4才能與 celery4 兼容

------------

安裝 Airflow 1.10.1 時的一個錯誤:

AttributeError: 'float' object has no attribute 'items'

參考:https://github.com/celery/celery/issues/5175

pip uninstall redis

pip install redis==2.10.6

=========================================================

 rabbitmq 安裝

主要參考:https://www.linuxidc.com/Linux/2016-03/129557.htm

碰到的坑:

原因就是  erlang 和 rabbitmq各有一個 cookie 

~/.erlang.cookie

/var/lib/rabbitmq/.erlang.cookie

將其中一個修改,改成相同的。

ps -ef | grep rabbitmq 

將進程殺掉

然后重啟服務器就解決了。

 


=========================================================

Airflow中使用 sshoperator

 http://yangcongchufang.com/airflow/airflow-ssh-operator.html

問題:{ssh_operator.py:118} WARNING - /home/airflow/tmp/a.sh: line 3: hive: command not found

a.sh 內容:

echo "aaaa" > /home/airflow/tmp/aaa1.txt
hive -f /home/airflow/tmp/a.hql
echo "aaaa" > /home/airflow/tmp/aaa2.txt

明明調用成功了 a.sh,  aaa1.txt  aaa2.txt 有成功生成,但是找不到 hive 命令,而在emr服務器上直接執行 sh a.sh 卻可以成功打印出數據庫名稱列表。

解決方法:

執行:export PATH=$PATH:$HIVE_HOME/bin

在 a.sh第一行中加入:

source /etc/profile 


=========================================================

 

如出現以下報錯,則執行:yum install MySQL-python (注意此命令為centos7系統適合)

return __import__('MySQLdb') ImportError: No modulenamedMySQLdb

==========================================================

之前用的一直是airflow1.9  ,最近安裝了一遍 airflow1.10。

在airflow1.9中,subdag_operator 默認使用的executor = GetDefaultExecutor(),

而1.10中默認的是executor=SequentialExecutor()。

所以導致的結果就是在 1.9 中的subdag中可以並行執行task,而 1.10中,如果不

指定 excutor,只會逐個執行。指定方法:

from airflow.executors.celery_executor import CeleryExecutor
mysubdag = SubDagOperator(
    executor=CeleryExecutor()
    ...
)

關於這個變動,官方文檔的 tips 中有做說明: https://airflow.apache.org/concepts.html#subdags

其實是Airflow1.9中其實是有個bug,subdag中無法使用 pool 中的 slot 數控制並行任務數。

1.10中沒有做這個bug的修復,而是直接給個默認值:SequentialExecutor。這是個坑~~~-_-||

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM