1. 依賴
MySqlOperator
的數據庫交互通過 MySQLdb
模塊來實現, 使用前需要安裝相關依賴:
pip install apache-airflow[mysql]
2. 使用
使用 MySqlOperator
執行sql任務的一個簡單例子:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.mysql_operator import MySqlOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['j_hao104@163.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
'MySqlOperatorExample',
default_args=default_args,
description='MySqlOperatorExample',
schedule_interval="30 18 * * *")
insert_sql = "insert into log SELECT * FROM temp_log"
task = MySqlOperator(
task_id='select_sql',
sql=insert_sql,
mysql_conn_id='mysql_conn',
autocommit=True,
dag=dag)
3. 參數
MySqlOperator
接收幾個參數:
sql
: 待執行的sql語句;mysql_conn_id
: mysql數據庫配置ID, Airflow的conn配置有兩種配置方式,一是通過os.environ
來配置環境變量實現,二是通過web界面配置到代碼中,具體的配置方法會在下文描述;parameters
: 相當於MySQLdb
庫的execute
方法的第二參數,比如:cur.execute('insert into UserInfo values(%s,%s)',('alex',18))
;autocommit
: 自動執行commit
;database
: 用於覆蓋conn
配置中的數據庫名稱, 這樣方便於連接統一個mysql的不同數據庫;
4. conn
配置
建議conn
配置通過web界面來配置,這樣不用硬編碼到代碼中,關於配置中的各個參數:
Conn Id
: 對應MySqlOperator
中的mysql_conn_id
;Host
: 數據庫IP地址;Schema
: 庫名, 可以被MySqlOperator
中的database
重寫;Login
: 登錄用戶名;Password
: 登錄密碼;Port
: 數據庫端口;Extra
:MySQLdb.connect
的額外參數,包含charset
、cursor
、ssl
、local_infile
其中cursor
的值的對應關系為: sscursor
—> MySQLdb.cursors.SSCursor
; dictcursor
—> MySQLdb.cursors.DictCursor
; ssdictcursor
—> MySQLdb.cursors.SSDictCursor