前言
DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
- Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
- Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
- Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。

從設計之初,DataX就把異構數據源同步作為自身的使命,為了應對不同數據源的差異、同時提供一致的同步原語和擴展能力,DataX自然而然地采用了框架 + 插件 的模式:
插件只需關心數據的讀取或者寫入本身。
而同步的共性問題,比如:類型轉換、性能、統計,則交由框架來處理。
作為插件開發人員,則需要關注兩個問題:
數據源本身的讀寫數據正確性。
如何與框架溝通、合理正確地使用框架。
本文重點介紹DataX插件加載原理,對於插件的使用不再敘述,具體使用可參考
DataX-Github
DataX插件開發寶典
插件機制原理
Datax有好幾種類型的插件,每個插件都有不同的作用。
- reader, 讀插件。Reader就是屬於這種類型的
- writer, 寫插件。Writer就是屬於這種類型的
- transformer, 目前還未知
- handler, 主要用於任務執行前的准備工作和完成的收尾工作。
public enum PluginType {
//pluginType還代表了資源目錄,很難擴展,或者說需要足夠必要才擴展。先mark Handler(其實和transformer一樣),再討論
READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");
private String pluginType;
private PluginType(String pluginType) {
this.pluginType = pluginType;
}
@Override
public String toString() {
return this.pluginType;
}
}
datax.py
程序啟動入口為datax.py,這里不再細看,主要是為了完成以下功能:
- 打印DataX版權信息
- 准備配置參數
- 構建啟動命令
- 啟動Java子進程
Engine啟動流程
從datax.py文件可知,Java進程啟動入口為com.alibaba.datax.core.Engine,該類負責初始化Job或者Task的運行容器,並運行插件的Job或者Task邏輯。
插件配置加載
給ConfigParser.parse(final String jobPath)傳入job路徑,該方法組裝解析,最后返回一個Configuration對象,Configuration里解析出了reader,writer,handler等插件名稱;
/**
* 指定Job配置路徑,ConfigParser會解析Job、Plugin、Core全部信息,並以Configuration返回
*/
public static Configuration parse(final String jobPath) {
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
configuration.merge(
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
// todo config優化,只捕獲需要的plugin
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName)) {
pluginList.add(preHandlerName);
}
if(StringUtils.isNotEmpty(postHandlerName)) {
pluginList.add(postHandlerName);
}
try {
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}catch (Exception e){
//吞掉異常,保持log干凈。這里message足夠。
LOG.warn(String.format("插件[%s,%s]加載失敗,1s后重試... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
//
}
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}
return configuration;
}
提取完插件名稱后,會去reader目錄和writer目錄,尋找插件的位置。目前Datax只支持reader和writer插件,因為它只從這兩個目錄中尋找。如果想自己擴展其他類型插件的話,比如handler類型的, 需要修改parsePluginConfig的代碼。每個插件目錄會有一個重要的配置文件 plugin.json ,它定義了插件的名稱和對應的類,在LoadUtils類加載插件的時候會使用到。
JobContainer#start
com.alibaba.datax.core.job.JobContainer#start方法
進入start()方法的this.init()方法
進入this.jobReader = this.initJobReader(jobPluginCollector);
/**
* reader job的初始化,返回Reader.Job
*
* @return
*/
private Reader.Job initJobReader(
JobPluginCollector jobPluginCollector) {
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
// 設置reader的jobConfig
jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
// 設置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init();
classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
}
上面代碼主要做了:
通過配置文件獲取插件的名稱
保存當前classLoader,並將當前線程的classLoader設置為所給對應的JarLoader
加載Reader插件的實現類
初始化Reader的參數
執行jobReader的init方法
將當前線程的類加載器設置為保存的類加載,恢復之前的線程上下文加載器
LoadUtil#loadJobPlugin
com.alibaba.datax.core.util.container.LoadUtil#loadJobPlugin
/**
* 加載JobPlugin,reader、writer都可能要加載
*
* @param pluginType
* @param pluginName
* @return
*/
public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
String pluginName) {
Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);
try {
AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
.newInstance();
jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
return jobPlugin;
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format("DataX找到plugin[%s]的Job配置.",
pluginName), e);
}
}
通過 Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);實例化對應的插件類
LoadUtil#loadPluginClass
com.alibaba.datax.core.util.container.LoadUtil#loadPluginClass
/**
* 反射出具體plugin實例
*
* @param pluginType
* @param pluginName
* @param pluginRunType
* @return
*/
@SuppressWarnings("unchecked")
private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
PluginType pluginType, String pluginName,
ContainerType pluginRunType) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
try {
return (Class<? extends AbstractPlugin>) jarLoader
.loadClass(pluginConf.getString("class") + "$"
+ pluginRunType.value());
} catch (Exception e) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
這里獲取到JarLoader,通過JarLoader的loadClass方法加載我們plugin.json配置的class
LoadUtil#getJarLoader
com.alibaba.datax.core.util.container.LoadUtil#getJarLoader
public static synchronized JarLoader getJarLoader(PluginType pluginType,
String pluginName) {
Configuration pluginConf = getPluginConf(pluginType, pluginName);
JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
pluginName));
if (null == jarLoader) {
String pluginPath = pluginConf.getString("path");
if (StringUtils.isBlank(pluginPath)) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
String.format(
"%s插件[%s]路徑非法!",
pluginType, pluginName));
}
jarLoader = new JarLoader(new String[]{pluginPath});
jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
jarLoader);
}
return jarLoader;
}
根據類型和名稱從緩存中獲取,如果沒有則去創建,首先獲取插件的路徑.比如:"path": "D:\DataX\target\datax\datax\plugin\reader\mysqlreader"
然后根據JarLoader里面的getURLs(paths)獲取插件路徑下所有的jar包。
創建單獨的JarLoader,把創建的JarLoader緩存起來。
自定義類加載器
DataX通過自定義類加載器JarLoader,提供Jar隔離的加載機制。JarLoader是Application ClassLoader的子類,插件的加載接口由LoadUtil類負責,DataX通過Thread.currentThread().setContextClassLoader在每次對插件調用前后的進行classLoader的切換實現jar隔離的加載機制。

JarLoader
JarLoader繼承URLClassLoader,擴充了可以加載目錄的功能。可以從指定的目錄下,把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path下。
/**
* 提供Jar隔離的加載機制,會把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path。
*/
public class JarLoader extends URLClassLoader {
public JarLoader(String[] paths) {
this(paths, JarLoader.class.getClassLoader());
}
public JarLoader(String[] paths, ClassLoader parent) {
super(getURLs(paths), parent);
}
private static URL[] getURLs(String[] paths) {
Validate.isTrue(null != paths && 0 != paths.length,
"jar包路徑不能為空.");
List<String> dirs = new ArrayList<String>();
for (String path : paths) {
dirs.add(path);
JarLoader.collectDirs(path, dirs);
}
List<URL> urls = new ArrayList<URL>();
for (String path : dirs) {
urls.addAll(doGetURLs(path));
}
return urls.toArray(new URL[0]);
}
private static void collectDirs(String path, List<String> collector) {
if (null == path || StringUtils.isBlank(path)) {
return;
}
File current = new File(path);
if (!current.exists() || !current.isDirectory()) {
return;
}
for (File child : current.listFiles()) {
if (!child.isDirectory()) {
continue;
}
collector.add(child.getAbsolutePath());
collectDirs(child.getAbsolutePath(), collector);
}
}
private static List<URL> doGetURLs(final String path) {
Validate.isTrue(!StringUtils.isBlank(path), "jar包路徑不能為空.");
File jarPath = new File(path);
Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
"jar包路徑必須存在且為目錄.");
/* set filter */
FileFilter jarFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar");
}
};
/* iterate all jar */
File[] allJars = new File(path).listFiles(jarFilter);
List<URL> jarURLs = new ArrayList<URL>(allJars.length);
for (int i = 0; i < allJars.length; i++) {
try {
jarURLs.add(allJars[i].toURI().toURL());
} catch (Exception e) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INIT_ERROR,
"系統加載jar包出錯", e);
}
}
return jarURLs;
}
}
ClassLoaderSwapper
com.alibaba.datax.core.util.container.ClassLoaderSwapper
用於保存着切換之前的ClassLoader,避免Jar加載沖突。
/**
* Created by jingxing on 14-8-29.
*
* 為避免jar沖突,比如hbase可能有多個版本的讀寫依賴jar包,JobContainer和TaskGroupContainer
* 就需要脫離當前classLoader去加載這些jar包,執行完成后,又退回到原來classLoader上繼續執行接下來的代碼
*/
public final class ClassLoaderSwapper {
private ClassLoader storeClassLoader = null;
private ClassLoaderSwapper() {
}
public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {
return new ClassLoaderSwapper();
}
/**
* 保存當前classLoader,並將當前線程的classLoader設置為所給classLoader
*
* @param
* @return
*/
public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {
this.storeClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
return this.storeClassLoader;
}
/**
* 將當前線程的類加載器設置為保存的類加載
* @return
*/
public ClassLoader restoreCurrentThreadClassLoader() {
ClassLoader classLoader = Thread.currentThread()
.getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.storeClassLoader);
return classLoader;
}
}
總結
DataX的設計思路是非常清晰的:將配置的json用到了極致;另一塊是通過URLClassLoader實現插件的熱加載。配置分系統參數(core.json,plugin.json)和任務參數(job.json),系統參數可以被覆蓋。進程啟動式掃描配置和插件目錄,加載相應的插件。其次,將邏輯分成reader/writer和框架兩部分,read/writer是可以交出去的部分,也是變化點(異構數據源訪問和數據處理),框架負責調度處理和流控。
