1. 依賴
MySqlOperator 的數據庫交互通過 MySQLdb 模塊來實現, 使用前需要安裝相關依賴:
pip install apache-airflow[mysql]
2. 使用
使用 MySqlOperator 執行sql任務的一個簡單例子:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.mysql_operator import MySqlOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['j_hao104@163.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
'MySqlOperatorExample',
default_args=default_args,
description='MySqlOperatorExample',
schedule_interval="30 18 * * *")
insert_sql = "insert into log SELECT * FROM temp_log"
task = MySqlOperator(
task_id='select_sql',
sql=insert_sql,
mysql_conn_id='mysql_conn',
autocommit=True,
dag=dag)
3. 參數
MySqlOperator 接收幾個參數:
sql: 待執行的sql語句;mysql_conn_id: mysql數據庫配置ID, Airflow的conn配置有兩種配置方式,一是通過os.environ來配置環境變量實現,二是通過web界面配置到代碼中,具體的配置方法會在下文描述;parameters: 相當於MySQLdb庫的execute方法的第二參數,比如:cur.execute('insert into UserInfo values(%s,%s)',('alex',18));autocommit: 自動執行commit;database: 用於覆蓋conn配置中的數據庫名稱, 這樣方便於連接統一個mysql的不同數據庫;
4. conn配置

建議conn配置通過web界面來配置,這樣不用硬編碼到代碼中,關於配置中的各個參數:
Conn Id: 對應MySqlOperator中的mysql_conn_id;Host: 數據庫IP地址;Schema: 庫名, 可以被MySqlOperator中的database重寫;Login: 登錄用戶名;Password: 登錄密碼;Port: 數據庫端口;Extra:MySQLdb.connect的額外參數,包含charset、cursor、ssl、local_infile
其中cursor的值的對應關系為: sscursor —> MySQLdb.cursors.SSCursor; dictcursor —> MySQLdb.cursors.DictCursor; ssdictcursor —> MySQLdb.cursors.SSDictCursor
