【DataX】Java中集成DataX開發


步驟

先說總體步驟:

  1. 下載源碼,並編譯到本地maven倉庫[上傳私服(可選)];
  2. pom文件依賴datax-core和需要的readerwriter
  3. 環境變量設置datax.home(或者利用System#setProperty(String))和一些需要替換腳本中的變量:腳本中${}占位符的變量將被系統變量替換。
  4. 將datax.tar.gz中解壓出來的confplugin等文件放到datax.home目錄中。
  5. 構造參數數組:{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
  6. 調用Engin#main(String[])或者Engine#entry(String[])

引言

目前官方的使用指南里都是利用python來調用dataX執行任務。而且現有的博客基本上也是利用java來調用python命令Runtime.getRuntime().exec()來執行。
個人感覺,dataX未提供java集成開發的方法,應該是定位生產系統,運維需要吧?!
我們的業務場景:執行完dataX的job之后,還有一定的業務邏輯,所以希望在java應用里調用dataX執行完job之后,再執行后續邏輯。

DataX分析

筆者簡單的看了一下午的DataX的邏輯,完全以使用者的視角分析DataX,必然不能完全了解DataX的整個執行過程。
本文僅分析如果能夠在java代碼里集成DataX進行開發。

集成准備

DataX沒有將代碼上傳到maven服務器上,所以需要自己先pull代碼到本地,編譯,才能在集成開發的使用通過pom引用。有條件的可以上傳到自己的私服上。
代碼地址

代碼依賴

通過pom文件加入datax-core

<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-core</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> 

如果需要對應的readerwriter的話,加入到pom文件中,比如需要streamreader和streamwriter:

<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamreader</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamwriter</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> 

要依賴datax一定要保證有對應的源碼或者編譯到本機的maven repository或者在對應的私服上有上傳相應的編譯版本,不然pom文件是找不到依賴的。

為了集成開發,可能需要一口氣引用所有的reader和writer,目前所知,就得一個一個寫,如果大家有好辦法,麻煩告知!

 

 

准備相應的文件

com.alibaba.datax.core.util.container.CoreConstant中可以看到,datax.home很重要,很多文件的讀取都是在datax.home里面獲取的。就如我們在安裝版的datax中可以看到里面一些目錄一樣

$ ll
total 4 drwxr-xr-x 2 mcbadm mcb 56 Sep 20 18:28 bin drwxr-xr-x 2 mcbadm mcb 65 Sep 20 18:28 conf drwxr-xr-x 2 mcbadm mcb 21 Sep 20 18:28 job drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib drwxr-xr-x 4 mcbadm mcb 32 Sep 20 18:28 plugin drwxr-xr-x 2 mcbadm mcb 22 Sep 20 18:28 script drwxr-xr-x 2 mcbadm mcb 23 Sep 20 18:28 tmp 

目前所知的,Engine#entry在解析配置的時候會讀取conf目錄下的文件,還有對應plugin/reader/xxxreader、plugin/writer/xxxwriter的plugin.json文件:

{ "name": "streamreader", "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data from stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" } 

編寫代碼

編寫job代碼:

{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 1, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX,現在是${now}" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } } } 

寫個測試類吧:

import java.time.LocalTime; import com.alibaba.datax.core.Engine; public class EngineTest { public static void main(String[] args) { System.setProperty("datax.home", getCurrentClasspath()); System.setProperty("now", LocalTime.now().toString());// 替換job中的占位符 String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"}; try { Engine.entry(datxArgs); } catch (Throwable e) { e.printStackTrace(); } } public static String getCurrentClasspath() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); String currentClasspath = classLoader.getResource("").getPath(); // 當前操作系統 String osName = System.getProperty("os.name"); if (osName.startsWith("Windows")) { // 刪除path中最前面的/ currentClasspath = currentClasspath.substring(1); } return currentClasspath; } } 

datax在解析完配置后,會將core.json,job.json,plugin.json合並在一起:

{ "common": { "column": { "dateFormat": "yyyy-MM-dd", "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "encoding": "utf-8", "extraFormats": [ "yyyyMMdd" ], "timeFormat": "HH:mm:ss", "timeZone": "GMT+8" } }, "core": { "container": { "job": { "id": -1, "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "dataXServer": { "address": "http://localhost:7001/api", "reportDataxLog": false, "reportPerfLog": false, "timeout": 10000 }, "statistics": { "collector": { "plugin": { "maxDirtyNumber": 10, "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector" } } }, "transport": { "channel": { "byteCapacity": 67108864, "capacity": 512, "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "flowControlInterval": 20, "speed": { "byte": -1, "record": -1 } }, "exchanger": { "bufferSize": 32, "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger" } } }, "entry": { "jvm": "-Xms1G -Xmx1G" }, "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ], "sliceRecordCount": 1 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } }, "plugin": { "reader": { "streamreader": { "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "mechanism": "use datax framework to transport data from stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamreader", "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\reader\\streamreader" } }, "writer": { "streamwriter": { "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter", "description": { "mechanism": "use datax framework to transport data to stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamwriter", "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\writer\\streamwriter" } } } } 

說說插件原理

每個reader和writer都有自己的plugin.json文件,里面最重要的就是class配置了,這個類的全路徑配置用於classloader將其加載進來並通過反射將其實例化。加載代碼可看com.alibaba.datax.core.util.container.LoadUtil
所以我們在集成的時候,plugin目錄下面不需要有jar包了,只需要放json文件就行,因為我們通過pom文件依賴了對應的reader和writer,說白了,就是classpath下面有對應的reader和writer即可。

結束語

文章有點長,記錄了一個下午的研究結果,應該有很多不完善的地方,希望可以和大家多交流。如果覺得有幫助,可以點個贊。

轉自:https://www.jianshu.com/p/01672e5ea1b6 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM