步驟
先說總體步驟:
- 下載源碼,並編譯到本地
maven
倉庫[上傳私服(可選)]; pom
文件依賴datax-core
和需要的reader
和writer
- 環境變量設置
datax.home
(或者利用System#setProperty(String)
)和一些需要替換腳本中的變量:腳本中${}
占位符的變量將被系統變量替換。 - 將datax.tar.gz中解壓出來的
conf
、plugin
等文件放到datax.home目錄中。 - 構造參數數組:
{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
- 調用
Engin#main(String[])
或者Engine#entry(String[])
引言
目前官方的使用指南里都是利用python來調用dataX執行任務。而且現有的博客基本上也是利用java來調用python命令Runtime.getRuntime().exec()
來執行。
個人感覺,dataX未提供java集成開發的方法,應該是定位生產系統,運維需要吧?!
我們的業務場景:執行完dataX的job之后,還有一定的業務邏輯,所以希望在java應用里調用dataX執行完job之后,再執行后續邏輯。
DataX分析
筆者簡單的看了一下午的DataX的邏輯,完全以使用者的視角分析DataX,必然不能完全了解DataX的整個執行過程。
本文僅分析如果能夠在java代碼里集成DataX進行開發。
集成准備
DataX沒有將代碼上傳到maven服務器上,所以需要自己先pull代碼到本地,編譯,才能在集成開發的使用通過pom引用。有條件的可以上傳到自己的私服上。
代碼地址
代碼依賴
通過pom文件加入datax-core
:
<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-core</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
如果需要對應的reader
和writer
的話,加入到pom文件中,比如需要streamreader和streamwriter:
<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamreader</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamwriter</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
要依賴datax一定要保證有對應的源碼或者編譯到本機的maven repository或者在對應的私服上有上傳相應的編譯版本,不然pom文件是找不到依賴的。
為了集成開發,可能需要一口氣引用所有的reader和writer,目前所知,就得一個一個寫,如果大家有好辦法,麻煩告知!
准備相應的文件
從com.alibaba.datax.core.util.container.CoreConstant
中可以看到,datax.home
很重要,很多文件的讀取都是在datax.home
里面獲取的。就如我們在安裝版的datax中可以看到里面一些目錄一樣
$ ll
total 4 drwxr-xr-x 2 mcbadm mcb 56 Sep 20 18:28 bin drwxr-xr-x 2 mcbadm mcb 65 Sep 20 18:28 conf drwxr-xr-x 2 mcbadm mcb 21 Sep 20 18:28 job drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib drwxr-xr-x 4 mcbadm mcb 32 Sep 20 18:28 plugin drwxr-xr-x 2 mcbadm mcb 22 Sep 20 18:28 script drwxr-xr-x 2 mcbadm mcb 23 Sep 20 18:28 tmp
目前所知的,Engine#entry
在解析配置的時候會讀取conf目錄下的文件,還有對應plugin/reader/xxxreader、plugin/writer/xxxwriter的plugin.json文件:
{ "name": "streamreader", "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data from stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" }
編寫代碼
編寫job代碼:
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 1, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX,現在是${now}" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } } }
寫個測試類吧:
import java.time.LocalTime; import com.alibaba.datax.core.Engine; public class EngineTest { public static void main(String[] args) { System.setProperty("datax.home", getCurrentClasspath()); System.setProperty("now", LocalTime.now().toString());// 替換job中的占位符 String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"}; try { Engine.entry(datxArgs); } catch (Throwable e) { e.printStackTrace(); } } public static String getCurrentClasspath() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); String currentClasspath = classLoader.getResource("").getPath(); // 當前操作系統 String osName = System.getProperty("os.name"); if (osName.startsWith("Windows")) { // 刪除path中最前面的/ currentClasspath = currentClasspath.substring(1); } return currentClasspath; } }
datax在解析完配置后,會將core.json,job.json,plugin.json合並在一起:
{ "common": { "column": { "dateFormat": "yyyy-MM-dd", "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "encoding": "utf-8", "extraFormats": [ "yyyyMMdd" ], "timeFormat": "HH:mm:ss", "timeZone": "GMT+8" } }, "core": { "container": { "job": { "id": -1, "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "dataXServer": { "address": "http://localhost:7001/api", "reportDataxLog": false, "reportPerfLog": false, "timeout": 10000 }, "statistics": { "collector": { "plugin": { "maxDirtyNumber": 10, "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector" } } }, "transport": { "channel": { "byteCapacity": 67108864, "capacity": 512, "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "flowControlInterval": 20, "speed": { "byte": -1, "record": -1 } }, "exchanger": { "bufferSize": 32, "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger" } } }, "entry": { "jvm": "-Xms1G -Xmx1G" }, "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ], "sliceRecordCount": 1 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1