前言:
最近一直在做datax的增量更新,算是對datax有了一點新的認識吧。
因為公司需要以greenplum為核心來搭建自己的數倉平台,以滿足業務上的多維快速查詢(以前多維查詢用到是kylin,但隨着數據量的增加,kylin越來越難以滿足我們的需求了)。
然而,greenplum的數據導入方面並不是很友好,通常,需要使用copy或者是gpfdist才能快速的往GP里面導入數據。
我們試了kettle來往GP導,但速度非常慢,原因是kettle導數據進入GP需要經過GP的master,是一條一條insert進去的。試了datax,同樣是速度奇慢。
所以我們采用了別人開發好的datax版本,https://github.com/HashDataInc/DataX ,接下來我對datax增量更新所做的,都是在該版本的基礎上來實現的。
datax簡介
datax是阿里開源的一個etl工具,支持多種異構數據源。當然,datax適合用於離線數據的同步,不適合實時同步。具體的介紹,網上有不少博客,這里就不多介紹了
datax增量更新
思路:
第一種思路:datax的job是以一個json文件來描述的,本身提供了where條件,支持簡單的增量更新
第二種思路:一旦我們的job中,增量抽取的數據比較復雜,比如,本身sql中需要多表關聯或者有多個子查詢,此時where條件已經無法滿足。這種情況就是我接下來要講的。
首先,為什么說如果抽取數據的sql比較復雜,where條件就無法滿足
以mysqlreader舉例,datax支持普通配置和用戶自定義配置兩種
普通配置需要用戶配置Table、Column、Where的信息,而用戶自定義配置則比較簡單粗暴,只需要配置querySql信息,兩者的模版如下:
普通配置:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
自定義配置:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [],
"querySql": []
}
],
"password": "",
"username": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
而且,阿里官方githup上有這樣的描述:
對於用戶配置Table、Column、Where的信息,MysqlReader將其拼接為SQL語句發送到Mysql數據庫;對於用戶配置querySql信息,MysqlReader直接將其發送到Mysql數據庫。
當用戶配置querySql時,MysqlReader直接忽略table、column、where條件的配置,
querySql優先級大於table、column、where選項。
也就是說,querySql適用於復雜情況下,配置更靈活。
所以,一般的增量更新,我們都會采取querySql來進行
第二個問題:我怎么讓datax知道一個job是需要增量的還是全量的
很遺憾,datax本身不適合用於做增量,所以,正常的描述一個job的json中,我們沒有辦法讓datax知道job是不是增量的;
那么,唯一的比較省力的解決辦法,是在datax的基礎上做二次開發
just like this:
{
"info": {
"increment_sql": [
"SELECT ifnull(max(id),0) from datax_m_rec_consume"
],
"is_increment": "1"
},
"job": {
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:sqlserver://******:1433;DatabaseName=sjtb"],
"querySql": ["SELECT * from m_rec_consume where id > ${last_max}"]
}
],
"password": "*******",
"username": "sa"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "*****",
"password": "****************",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://*********:3306/datacenter_dev?characterEncoding=utf8",
"table": ["datax_m_rec_consume"]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
如上,為每一個job添加一個配置項info,代表一個job的信息
在info項中,配置了2項信息,一種是表示這個job是否是增量的is_increment,1表示是增量,0 表示全量。當然,datax本身是沒有該項的,所以,這個配置項是由我們的java代碼來讀取解析的。
當我們配置了增量的話,我們可以拿到該增量sql,在該任務執行之前,通過用戶配置的writer插件的連接信息,執行該sql,將對應的值替換掉querySql中的${last_max},並將新的json內容寫出到json文件中。
這樣,在調用datax.py腳本時,就可以執行到具體的job了。
如果我們設置了全量,那么就直接由datax執行該json任務即可
第三個問題:怎么才能利用起datax自身提供的日志
在改造完增量更新之后,突然意識到,如果用腳本將datax自身的腳本與我們的程序進行封裝,我們很難拿到datax自身提供的詳細日志。
so,修改datax.py腳本吧。
我們需要在datax執行任務之前去調用我們的java代碼,以便實現增量與全量的判斷,以及增量更新的動態修改json文件中的占位符
那么,就添加一個方法,在該方法中,發送請求調用java程序即可,返回json文件的真實路徑(如果是增量的任務,我們會將json內容寫出到一個臨時json文件中,datax實際執行的是該臨時文件的內容)
def findRealPath(path): #import pdb;pdb.set_trace() #data = {'fileName': args[0], 'flag': args[1]} data = {'path': path} data_urlencode = urllib.urlencode(data) requrl = "http://localhost:7777/job/runJob2" #if '0' == args[1]: # requrl = "http://localhost:7777/job/saveJob" #elif '0' != args[1] and '1' != args[1]: # sys.exit(RET_STATE['FAIL']) req = urllib2.Request(url=requrl, data=data_urlencode) #print req res_data = urllib2.urlopen(req) res = res_data.read() resdict = json.loads(res) #print type(resdict) file_real_path =str(resdict['data']) if 500 == resdict['code']: sys.exit(RET_STATE['FAIL']) elif 200== resdict['code']: print file_real_path #sys.exit(RET_STATE['OK']) print res return file_real_path
修改main方法:
if __name__ == "__main__": printCopyright() parser = getOptionParser() options, args = parser.parse_args(sys.argv[1:]) if options.reader is not None and options.writer is not None: generateJobConfigTemplate(options.reader,options.writer) sys.exit(RET_STATE['OK']) if len(args) != 1: parser.print_help() sys.exit(RET_STATE['FAIL']) real_path = os.path.abspath(args[0]) file_real_path =findRealPath(real_path) arglist = [] arglist.append(file_real_path) #startCommand = buildStartCommand(options, args) startCommand = buildStartCommand(options, arglist) #print startCommand child_process = subprocess.Popen(startCommand, shell=True) register_signal() (stdout, stderr) = child_process.communicate() sys.exit(child_process.returncode)
在main方法中調用我們自己寫的findRealPath方法,獲取到要執行的json文件的真實路徑后,交由datax執行。
這里有個bug,在main方法中,為了適配絕對路徑和相對路徑,我用了
real_path = os.path.abspath(args[0])
去獲取用戶傳入的json文件的絕對路徑,將這個絕對路徑傳入我們的java代碼中,然而,用jenkins去調度時發現 os.path.abspath(args[0]) 這個方法實際找到的絕對路徑是不對的,
它是以jenkins程序所在的路徑為基准,來轉變相對路徑的。所以這個還是要改一下。
具體bug如下:
我配置了:
python /home/hashdatax/datax/bin/datax.py ../job/user_defined_job/test_mysql_to_sqlserver.json
實際獲取到的路徑是:
java.io.IOException: /root/.jenkins/workspace/job/user_defined_job/test_mysql_to_sqlserver.json (沒有那個文件或目錄)
