Airflow筆記-MySqlOperator使用及conn配置


1. 依賴

MySqlOperator 的數據庫交互通過 MySQLdb 模塊來實現, 使用前需要安裝相關依賴:

pip install apache-airflow[mysql]

2. 使用

使用 MySqlOperator 執行sql任務的一個簡單例子:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.mysql_operator import MySqlOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['j_hao104@163.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    'MySqlOperatorExample',
    default_args=default_args,
    description='MySqlOperatorExample',
    schedule_interval="30 18 * * *")

insert_sql = "insert into log SELECT * FROM temp_log"


task = MySqlOperator(
    task_id='select_sql',
    sql=insert_sql,
    mysql_conn_id='mysql_conn',
    autocommit=True,
    dag=dag)

3. 參數

MySqlOperator 接收幾個參數:

  • sql: 待執行的sql語句;
  • mysql_conn_id: mysql數據庫配置ID, Airflow的conn配置有兩種配置方式,一是通過os.environ來配置環境變量實現,二是通過web界面配置到代碼中,具體的配置方法會在下文描述;
  • parameters: 相當於MySQLdb庫的execute 方法的第二參數,比如: cur.execute('insert into UserInfo values(%s,%s)',('alex',18));
  • autocommit: 自動執行 commit;
  • database: 用於覆蓋conn配置中的數據庫名稱, 這樣方便於連接統一個mysql的不同數據庫;

4. conn配置

建議conn配置通過web界面來配置,這樣不用硬編碼到代碼中,關於配置中的各個參數:

  • Conn Id: 對應 MySqlOperator 中的 mysql_conn_id
  • Host: 數據庫IP地址;
  • Schema: 庫名, 可以被MySqlOperator中的database重寫;
  • Login: 登錄用戶名;
  • Password: 登錄密碼;
  • Port: 數據庫端口;
  • Extra: MySQLdb.connect的額外參數,包含charsetcursorssllocal_infile

其中cursor的值的對應關系為: sscursor —> MySQLdb.cursors.SSCursor; dictcursor —> MySQLdb.cursors.DictCursor; ssdictcursor —> MySQLdb.cursors.SSDictCursor


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM