一. 前提步驟:
安裝datax及配置,clickhouse
datax目錄:/home/ad/datax/datax/
腳本目錄://home/ad/datax/datax/job
log目錄:/home/hadoop/workshell/tmp/
二. 腳本思路以及實現:
1. 編寫基於時間的執行腳本(python腳本)
1) 如果使用datax去做增量,需要根據某個時間去判斷,那需要將時間傳給實際執行的datax.py腳本中去執行,datax支持腳本變量。
2)基於時間戳做增量,時間戳是一個參數並且是一個可變量,我的思路是將這個文件放到一個X.record的文件中記錄時間,本次執行的時候獲得上次執行時間lastExecuteTime和當前時間currentTime,抽取的數據就是介於此二者之間的數據。
3) 以下是增量python執行腳本,暫定義傳入三個參數,datax的json文件路徑,輸出log日志的路徑以及記錄時間的record記錄文件路徑(.record文件不需創建,自動生成,默認第一次跑批歷史所有):
#encoding="utf-8" # two args , first: datax config file path, logfile path import time import sys import os print("going to execute") configFilePath = sys.argv[1] logFilePath = sys.argv[2] lastTimeExecuteRecord = sys.argv[3] print "==============================================" print "configFilePath :", configFilePath print "configFilePath :", logFilePath print "lastTimeExecute File :",lastTimeExecuteRecord print "==============================================" lastExecuteTime="" try: fo = open(lastTimeExecuteRecord.strip(), "r") lastExecuteTime = fo.read() except IOError: lastExecuteTime = '1970-01-01 00:00:00' print("last time execute time: " + lastExecuteTime) currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print("currentTime is :"+ currentTime) #os.system("python /home/hadoop/datax/bin/datax.py " + configFilePath + " --lastTime" + lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath) script2execute = "python /home/hadoop/datax/bin/datax.py %s -p \"-DlastTime='%s' -DcurrentTime='%s'\" >> %s"%( configFilePath, lastExecuteTime, currentTime,logFilePath) print("to be excute script:"+script2execute) os.system(script2execute) print("script execute ending") # update timestamp to file fo = open(lastTimeExecuteRecord.strip(), "w+") fo.write(currentTime) fo.close() print "ending---",lastTimeExecuteRecord
4) 編寫完后修改datax的執行配置json文件。
{ "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [{ "reader": { "name": "oraclereader", "parameter": { "username": "xx_dev1", "password": "xx_dev1", "connection": [{ "querySql": ["select PARTY_ID as P_PEM,CREATED_STAMP as P_CREATED_STAMP from PARTY where CREATED_STAMP >= to_date('${lastTime}', 'yyyy-MM-dd HH24:mi:ss') and CREATED_STAMP< to_date('${currentTime}', 'yyyy-MM-dd HH24:mi:ss')"], "jdbcUrl": ["jdbc:oracle:thin:@x.x.x.x:1521/test"] }] } }, "writer": { "name": "clickhousewriter", "parameter": { "username": "xx_dev", "password": "admin123!", "column": ["*"], "connection": [{ "jdbcUrl": "jdbc:clickhouse://x.x.x.x:xxxx/xx_prod", "table": ["dwd_xxx_allattr"] }] } } }] } }
5) 執行命令:
1 #!/bin/sh 2 source ~/.bash_profile 3 4 echo "start DI dwd_xxx_allattr" 5 date 6 7 8 # mysql同步到clickhouse 9 python /home/hadoop/workshell/datax_scheduler_dwd_xxx_allattr_di.py \ '/home/hadoop/datax/job/dwd_xxx_allattr_di.json' \ '/home/hadoop/workshell/tmp/dwd_xxx_allattr_di.log' \ '/home/hadoop/datax/jobTimer/record/dwd_xxx_allattr.record' 10 11 # 整體優化 12 sql="optimize table xx_prod.dwd_xxx_allattr FINAL;" 13 echo $sql 14 echo data1 15 clickhouse client -q"${sql}" 16 17 date 18 echo "end DI dwd_xxx_allattr"
record文件記錄內容:
1 2022-01-21 16:13:01