當前生產上的任務主要分為兩部分:sqoop任務和hive計算任務,測試這兩種任務,分別以shell文件和直接執行命令的方式來測試.
本次測試的表是airflow.code_library.
1.測試sqoop任務
1.1 測試全量抽取
1.1.1.直接執行命令
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'yangxw',
'depends_on_past': False,
'start_date': datetime(2017, 5, 23),
}
dag = DAG('sqoop4', default_args=default_args,schedule_interval=None)
bash_cmd = '''
sqoop import \
--connect jdbc:oracle:thin:@//XX.XX.XX.XX/aaaa \
--username bbbb --password 'cccc' \
--query " select CODENO, ITEMNO, ITEMNAME, BANKNO, SORTNO, ISINUSE, ITEMDESCRIBE, ITEMATTRIBUTE, RELATIVECODE, ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5, ATTRIBUTE6, ATTRIBUTE7, ATTRIBUTE8, INPUTUSER, INPUTORG, INPUTTIME, UPDATEUSER, UPDATETIME, REMARK, HELPTEXT , to_char(SysDate,'YYYY-MM-DD HH24:mi:ss') as etl_in_dt from XDGL.CODE_LIBRARY where \$CONDITIONS " \
--hcatalog-database airflow \
--hcatalog-table CODE_LIBRARY \
--hcatalog-storage-stanza 'stored as ORC' \
--hive-overwrite \
--hive-delims-replacement " " -m 1
'''
t1 = BashOperator(
task_id='sqoopshell',
bash_command=bash_cmd,
dag=dag)
測試成功,數據導入到表中.
1.1.2.以shell文件方式執行sqoop或hive任務
上述步驟雖然可以執行成功,但是如果要truncate 表,那么還要需要再增加一個task來執行truncate命令,這樣一個ETL任務就要分成兩個task很不方便.通過shell將truncate和import放在一起執行.
1)創建dag
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'yangxw',
'depends_on_past': False,
'start_date': datetime(2017, 5, 23)
}
dag = DAG('sqoop7', default_args=default_args,schedule_interval=None)
bash_cmd = 'sh /home/airflow/sqoop3.sh'
t1 = BashOperator(
task_id='sqoop7',
bash_command=bash_cmd,
dag=dag)
2)創建shell文件
hive -e "truncate table airflow.CODE_LIBRARY"
sqoop import \
--connect jdbc:oracle:thin:@//AAAA/BBB \
--username CCC --password 'DDD' \
--query " select CODENO, ITEMNO, ITEMNAME, BANKNO, SORTNO, ISINUSE, ITEMDESCRIBE, ITEMATTRIBUTE, RELATIVECODE, ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5, ATTRIBUTE6, ATTRIBUT
E7, ATTRIBUTE8, INPUTUSER, INPUTORG, INPUTTIME, UPDATEUSER, UPDATETIME, REMARK, HELPTEXT , to_char(SysDate,'YYYY-MM-DD HH24:mi:ss') as etl_in_dt from XDGL.CODE_LIBRARY where \$CONDITIONS " \
--hcatalog-database airflow \
--hcatalog-table CODE_LIBRARY \
--hcatalog-storage-stanza 'stored as ORC' \
--hive-overwrite \
--hive-delims-replacement " " -m 1
將這些文件分發到scheduler和worker節點上,然后執行:
查看日志會報錯:
…………
[2017-05-24 10:55:52,853] {base_task_runner.py:95} INFO - Subtask: File "/opt/anaconda2/lib/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
[2017-05-24 10:55:52,853] {base_task_runner.py:95} INFO - Subtask: raise TemplateNotFound(template)
[2017-05-24 10:55:52,854] {base_task_runner.py:95} INFO - Subtask: jinja2.exceptions.TemplateNotFound: sh /home/airflow/sqoop3.sh
這是airflow的一個bug,默認會使用jinja2的語法來解析task.
將
bash_cmd = 'sh /home/airflow/sqoop3.sh' 修改為
bash_cmd = '{{"sh /home/airflow/sqoop3.sh"}}' 即可
測試成功.或者使用:
bash_cmd = '''
sh /home/airflow/sqoop3.sh
'''
也可以執行成功.
1.2 測試增量抽取
新建個dag,sqoop8.
dag = DAG('sqoop8', default_args=default_args,schedule_interval=None)
bash_cmd = '''
sh /home/airflow/sqoop4.sh %s
''' % '2017-05-24'
t1 = BashOperator(
task_id='sqoop8',
bash_command=bash_cmd,
dag=dag)
創建shell:
hive -e "alter table airflow.ACCT_FEE_ARCH drop partition(p_day='$1');"
sqoop import --connect jdbc:oracle:thin:@//AAA/BBB --username CCC --password 'DDD' \
--query " select SERIALNO, \
……
to_char(SYNCHDATE, 'YYYY-MM-DD HH24:mi:ss') as SYNCHDATE , to_char(SysDate,'YYYY-MM-DD HH24:mi:ss') as ETL_IN_DT \
from XDGL.ACCT_FEE_ARCH \
where SYNCHDATE < (TO_DATE('$1', 'YYYY-MM-DD') +1) and SYNCHDATE >= (TO_DATE('$1', 'YYYY-MM-DD')) and \$CONDITIONS " \
--hcatalog-database airflow \
--hcatalog-table ACCT_FEE_ARCH \
--hcatalog-storage-stanza 'stored as ORC' \
--hive-partition-key p_day --hive-partition-value $1 \
--hive-delims-replacement " " -m 1
2.測試hive任務
上面以shell方式執行了hive truncate任務,下面以命令的方式執行sql文件.
創建sqoop9:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'yangxw',
'depends_on_past': False,
'start_date': datetime(2017, 5, 23)
}
dag = DAG('hivesh2', default_args=default_args,schedule_interval=None)
str1 = Variable.get("str1")
bash_cmd = '''
hive -f "/home/airflow/hive1.sql" -hivevar tbname=%s
''' % str1
t1 = BashOperator(
task_id='hivesh2',
bash_command=bash_cmd,
dag=dag)
創建hive sql文件:
insert overwrite table airflow.tab_cnt select '${tbname}', count(*) from ${tbname}
在頁面上創建變量 str1=airflow.ACCT_FEE_ARCH
執行成功.
3.總結
1.如果執行shell,一定要用jinja2語法或者''' ''':
bash_cmd = '{{" sh /home/airflow/sqoop1.sh"}}' 或者
bash_cmd = '''
sh /home/airflow/sqoop1.sh
'''
2.所有的文件必須復制到所有節點
python文件\shell文件\sql文件,必須復制到所有的webserver scheduler worker節點
3.有時候使用python命令編譯不出來pyc文件,在頁面上只能看到dag名稱,不能看到代碼及調度等.這時使用
python -m py_compile XXX.py 來編譯
4.airflow的dag一旦創建就無法刪除,錯誤的或者多余的dag可以設置為pause模式並隱藏.
5.shell的方式適合執行sqoop任務,可以將truncate table\drop partition和import一步執行完成,不用起兩個task來執行.命令的方式適合執行hive 任務,通過hive -f XXX.sql --hivevar a=%s b=%s的方式,動態的傳遞參數給hive.