使用datax從oracle日增同步數據到clickhouse


一. 前提步驟:

安裝datax及配置,clickhouse

datax目錄:/home/ad/datax/datax/

腳本目錄://home/ad/datax/datax/job

log目錄:/home/hadoop/workshell/tmp/

二. 腳本思路以及實現:

1. 編寫基於時間的執行腳本(python腳本)

1) 如果使用datax去做增量,需要根據某個時間去判斷,那需要將時間傳給實際執行的datax.py腳本中去執行,datax支持腳本變量。

2)基於時間戳做增量,時間戳是一個參數並且是一個可變量,我的思路是將這個文件放到一個X.record的文件中記錄時間,本次執行的時候獲得上次執行時間lastExecuteTime和當前時間currentTime,抽取的數據就是介於此二者之間的數據。

3) 以下是增量python執行腳本,暫定義傳入三個參數,datax的json文件路徑,輸出log日志的路徑以及記錄時間的record記錄文件路徑(.record文件不需創建,自動生成,默認第一次跑批歷史所有):

#encoding="utf-8"

# two args , first: datax config file path, logfile path

import time

import sys

import os

print("going to execute")

configFilePath = sys.argv[1]

logFilePath = sys.argv[2]

lastTimeExecuteRecord = sys.argv[3]

print "=============================================="

print "configFilePath      :", configFilePath

print "configFilePath      :", logFilePath

print "lastTimeExecute File :",lastTimeExecuteRecord

print "=============================================="

lastExecuteTime=""

try:

    fo = open(lastTimeExecuteRecord.strip(), "r")

    lastExecuteTime = fo.read()

except IOError:

    lastExecuteTime = '1970-01-01 00:00:00'

print("last time execute time:  " + lastExecuteTime)

currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

print("currentTime is        :"+ currentTime)

#os.system("python /home/hadoop/datax/bin/datax.py " + configFilePath + " --lastTime" +  lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath)

script2execute  = "python /home/hadoop/datax/bin/datax.py %s -p \"-DlastTime='%s' -DcurrentTime='%s'\"  >>  %s"%( configFilePath, lastExecuteTime, currentTime,logFilePath)

print("to be excute script:"+script2execute)

os.system(script2execute)

print("script execute ending")

# update timestamp to file

fo = open(lastTimeExecuteRecord.strip(), "w+")

fo.write(currentTime)

fo.close()

print "ending---",lastTimeExecuteRecord

4) 編寫完后修改datax的執行配置json文件。

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [{
            "reader": {
                "name": "oraclereader",
                "parameter": {
                    "username": "xx_dev1",
                    "password": "xx_dev1",
                    "connection": [{
                        "querySql": ["select PARTY_ID  as P_PEM,CREATED_STAMP as P_CREATED_STAMP from PARTY where CREATED_STAMP >= to_date('${lastTime}', 'yyyy-MM-dd HH24:mi:ss') and CREATED_STAMP< to_date('${currentTime}', 'yyyy-MM-dd HH24:mi:ss')"],
                        "jdbcUrl": ["jdbc:oracle:thin:@x.x.x.x:1521/test"]
                    }]
                }
            },
            "writer": {
                "name": "clickhousewriter",
                "parameter": {
                    "username": "xx_dev",
                    "password": "admin123!",
                    "column": ["*"],
                    "connection": [{
                        "jdbcUrl": "jdbc:clickhouse://x.x.x.x:xxxx/xx_prod",
                        "table": ["dwd_xxx_allattr"]
                    }]
                }
            }
        }]
    }
}

5) 執行命令:

  1 #!/bin/sh                                                                                                                                          
  2 source ~/.bash_profile
  3 
  4 echo "start DI dwd_xxx_allattr"
  5 date
  6 
  7 
  8 # mysql同步到clickhouse
  9 python /home/hadoop/workshell/datax_scheduler_dwd_xxx_allattr_di.py \ '/home/hadoop/datax/job/dwd_xxx_allattr_di.json'  \ '/home/hadoop/workshell/tmp/dwd_xxx_allattr_di.log'  \ '/home/hadoop/datax/jobTimer/record/dwd_xxx_allattr.record'
 10 
 11 # 整體優化
 12 sql="optimize table xx_prod.dwd_xxx_allattr  FINAL;"
 13 echo $sql
 14 echo  data1  
 15 clickhouse client -q"${sql}"
 16 
 17 date
 18 echo "end DI dwd_xxx_allattr"
record文件記錄內容:
1 2022-01-21 16:13:01

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM