DataX插件加載原理


前言

DataX 是一個異構數據源離線同步工具,致力於實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。

DataX本身作為離線數據同步框架,采用Framework + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

  • Reader:Reader為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
  • Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
  • Framework:Framework用於連接reader和writer,作為兩者的數據傳輸通道,並處理緩沖,流控,並發,數據轉換等核心技術問題。

從設計之初,DataX就把異構數據源同步作為自身的使命,為了應對不同數據源的差異、同時提供一致的同步原語和擴展能力,DataX自然而然地采用了框架 + 插件 的模式:

插件只需關心數據的讀取或者寫入本身。
而同步的共性問題,比如:類型轉換、性能、統計,則交由框架來處理。
作為插件開發人員,則需要關注兩個問題:
數據源本身的讀寫數據正確性。
如何與框架溝通、合理正確地使用框架。
本文重點介紹DataX插件加載原理,對於插件的使用不再敘述,具體使用可參考
DataX-Github
DataX插件開發寶典

插件機制原理

Datax有好幾種類型的插件,每個插件都有不同的作用。

  • reader, 讀插件。Reader就是屬於這種類型的
  • writer, 寫插件。Writer就是屬於這種類型的
  • transformer, 目前還未知
  • handler, 主要用於任務執行前的准備工作和完成的收尾工作。
public enum PluginType {
    //pluginType還代表了資源目錄,很難擴展,或者說需要足夠必要才擴展。先mark Handler(其實和transformer一樣),再討論
    READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");

    private String pluginType;

    private PluginType(String pluginType) {
        this.pluginType = pluginType;
    }

    @Override
    public String toString() {
        return this.pluginType;
    }
}

datax.py

程序啟動入口為datax.py,這里不再細看,主要是為了完成以下功能:

  • 打印DataX版權信息
  • 准備配置參數
  • 構建啟動命令
  • 啟動Java子進程

Engine啟動流程

從datax.py文件可知,Java進程啟動入口為com.alibaba.datax.core.Engine,該類負責初始化Job或者Task的運行容器,並運行插件的Job或者Task邏輯。

插件配置加載

給ConfigParser.parse(final String jobPath)傳入job路徑,該方法組裝解析,最后返回一個Configuration對象,Configuration里解析出了reader,writer,handler等插件名稱;

    /**
     * 指定Job配置路徑,ConfigParser會解析Job、Plugin、Core全部信息,並以Configuration返回
     */
    public static Configuration parse(final String jobPath) {
        Configuration configuration = ConfigParser.parseJobConfig(jobPath);

        configuration.merge(
                ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
                false);
        // todo config優化,只捕獲需要的plugin
        String readerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        String writerPluginName = configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

        String preHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);

        String postHandlerName = configuration.getString(
                CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

        Set<String> pluginList = new HashSet<String>();
        pluginList.add(readerPluginName);
        pluginList.add(writerPluginName);

        if(StringUtils.isNotEmpty(preHandlerName)) {
            pluginList.add(preHandlerName);
        }
        if(StringUtils.isNotEmpty(postHandlerName)) {
            pluginList.add(postHandlerName);
        }
        try {
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
        }catch (Exception e){
            //吞掉異常,保持log干凈。這里message足夠。
            LOG.warn(String.format("插件[%s,%s]加載失敗,1s后重試... Exception:%s ", readerPluginName, writerPluginName, e.getMessage()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                //
            }
            configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
        }

        return configuration;
    }

提取完插件名稱后,會去reader目錄和writer目錄,尋找插件的位置。目前Datax只支持reader和writer插件,因為它只從這兩個目錄中尋找。如果想自己擴展其他類型插件的話,比如handler類型的, 需要修改parsePluginConfig的代碼。每個插件目錄會有一個重要的配置文件 plugin.json ,它定義了插件的名稱和對應的類,在LoadUtils類加載插件的時候會使用到。

JobContainer#start

com.alibaba.datax.core.job.JobContainer#start方法
進入start()方法的this.init()方法
進入this.jobReader = this.initJobReader(jobPluginCollector);

    /**
     * reader job的初始化,返回Reader.Job
     *
     * @return
     */
    private Reader.Job initJobReader(
            JobPluginCollector jobPluginCollector) {
        this.readerPluginName = this.configuration.getString(
                CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.readerPluginName));

        Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
                PluginType.READER, this.readerPluginName);

        // 設置reader的jobConfig
        jobReader.setPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));

        // 設置reader的readerConfig
        jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
                CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));

        jobReader.setJobPluginCollector(jobPluginCollector);
        jobReader.init();

        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return jobReader;
    }

上面代碼主要做了:
通過配置文件獲取插件的名稱
保存當前classLoader,並將當前線程的classLoader設置為所給對應的JarLoader
加載Reader插件的實現類
初始化Reader的參數
執行jobReader的init方法
將當前線程的類加載器設置為保存的類加載,恢復之前的線程上下文加載器

LoadUtil#loadJobPlugin

com.alibaba.datax.core.util.container.LoadUtil#loadJobPlugin

    /**
     * 加載JobPlugin,reader、writer都可能要加載
     *
     * @param pluginType
     * @param pluginName
     * @return
     */
    public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
                                                  String pluginName) {
        Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
                pluginType, pluginName, ContainerType.Job);

        try {
            AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
                    .newInstance();
            jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
            return jobPlugin;
        } catch (Exception e) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    String.format("DataX找到plugin[%s]的Job配置.",
                            pluginName), e);
        }
    }

通過 Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
pluginType, pluginName, ContainerType.Job);實例化對應的插件類

LoadUtil#loadPluginClass

com.alibaba.datax.core.util.container.LoadUtil#loadPluginClass

    /**
     * 反射出具體plugin實例
     *
     * @param pluginType
     * @param pluginName
     * @param pluginRunType
     * @return
     */
    @SuppressWarnings("unchecked")
    private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
            PluginType pluginType, String pluginName,
            ContainerType pluginRunType) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);
        JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
        try {
            return (Class<? extends AbstractPlugin>) jarLoader
                    .loadClass(pluginConf.getString("class") + "$"
                            + pluginRunType.value());
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
        }
    }

這里獲取到JarLoader,通過JarLoader的loadClass方法加載我們plugin.json配置的class

LoadUtil#getJarLoader

com.alibaba.datax.core.util.container.LoadUtil#getJarLoader

    public static synchronized JarLoader getJarLoader(PluginType pluginType,
                                                      String pluginName) {
        Configuration pluginConf = getPluginConf(pluginType, pluginName);

        JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
                pluginName));
        if (null == jarLoader) {
            String pluginPath = pluginConf.getString("path");
            if (StringUtils.isBlank(pluginPath)) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR,
                        String.format(
                                "%s插件[%s]路徑非法!",
                                pluginType, pluginName));
            }
            jarLoader = new JarLoader(new String[]{pluginPath});
            jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
                    jarLoader);
        }

        return jarLoader;
    }

根據類型和名稱從緩存中獲取,如果沒有則去創建,首先獲取插件的路徑.比如:"path": "D:\DataX\target\datax\datax\plugin\reader\mysqlreader"
然后根據JarLoader里面的getURLs(paths)獲取插件路徑下所有的jar包。
創建單獨的JarLoader,把創建的JarLoader緩存起來。

自定義類加載器

DataX通過自定義類加載器JarLoader,提供Jar隔離的加載機制。JarLoader是Application ClassLoader的子類,插件的加載接口由LoadUtil類負責,DataX通過Thread.currentThread().setContextClassLoader在每次對插件調用前后的進行classLoader的切換實現jar隔離的加載機制。

JarLoader

JarLoader繼承URLClassLoader,擴充了可以加載目錄的功能。可以從指定的目錄下,把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path下。


/**
 * 提供Jar隔離的加載機制,會把傳入的路徑、及其子路徑、以及路徑中的jar文件加入到class path。
 */
public class JarLoader extends URLClassLoader {
    public JarLoader(String[] paths) {
        this(paths, JarLoader.class.getClassLoader());
    }

    public JarLoader(String[] paths, ClassLoader parent) {
        super(getURLs(paths), parent);
    }

    private static URL[] getURLs(String[] paths) {
        Validate.isTrue(null != paths && 0 != paths.length,
                "jar包路徑不能為空.");

        List<String> dirs = new ArrayList<String>();
        for (String path : paths) {
            dirs.add(path);
            JarLoader.collectDirs(path, dirs);
        }

        List<URL> urls = new ArrayList<URL>();
        for (String path : dirs) {
            urls.addAll(doGetURLs(path));
        }

        return urls.toArray(new URL[0]);
    }

    private static void collectDirs(String path, List<String> collector) {
        if (null == path || StringUtils.isBlank(path)) {
            return;
        }

        File current = new File(path);
        if (!current.exists() || !current.isDirectory()) {
            return;
        }

        for (File child : current.listFiles()) {
            if (!child.isDirectory()) {
                continue;
            }

            collector.add(child.getAbsolutePath());
            collectDirs(child.getAbsolutePath(), collector);
        }
    }

    private static List<URL> doGetURLs(final String path) {
        Validate.isTrue(!StringUtils.isBlank(path), "jar包路徑不能為空.");

        File jarPath = new File(path);

        Validate.isTrue(jarPath.exists() && jarPath.isDirectory(),
                "jar包路徑必須存在且為目錄.");

		/* set filter */
        FileFilter jarFilter = new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return pathname.getName().endsWith(".jar");
            }
        };

		/* iterate all jar */
        File[] allJars = new File(path).listFiles(jarFilter);
        List<URL> jarURLs = new ArrayList<URL>(allJars.length);

        for (int i = 0; i < allJars.length; i++) {
            try {
                jarURLs.add(allJars[i].toURI().toURL());
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        FrameworkErrorCode.PLUGIN_INIT_ERROR,
                        "系統加載jar包出錯", e);
            }
        }

        return jarURLs;
    }
}

ClassLoaderSwapper

com.alibaba.datax.core.util.container.ClassLoaderSwapper
用於保存着切換之前的ClassLoader,避免Jar加載沖突。

/**
 * Created by jingxing on 14-8-29.
 *
 * 為避免jar沖突,比如hbase可能有多個版本的讀寫依賴jar包,JobContainer和TaskGroupContainer
 * 就需要脫離當前classLoader去加載這些jar包,執行完成后,又退回到原來classLoader上繼續執行接下來的代碼
 */
public final class ClassLoaderSwapper {
    private ClassLoader storeClassLoader = null;

    private ClassLoaderSwapper() {
    }

    public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() {
        return new ClassLoaderSwapper();
    }

    /**
     * 保存當前classLoader,並將當前線程的classLoader設置為所給classLoader
     *
     * @param
     * @return
     */
    public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) {
        this.storeClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
        return this.storeClassLoader;
    }

    /**
     * 將當前線程的類加載器設置為保存的類加載
     * @return
     */
    public ClassLoader restoreCurrentThreadClassLoader() {
        ClassLoader classLoader = Thread.currentThread()
                .getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.storeClassLoader);
        return classLoader;
    }
}

總結

DataX的設計思路是非常清晰的:將配置的json用到了極致;另一塊是通過URLClassLoader實現插件的熱加載。配置分系統參數(core.json,plugin.json)和任務參數(job.json),系統參數可以被覆蓋。進程啟動式掃描配置和插件目錄,加載相應的插件。其次,將邏輯分成reader/writer和框架兩部分,read/writer是可以交出去的部分,也是變化點(異構數據源訪問和數據處理),框架負責調度處理和流控。

參考

從DataX學插件式架構設計
阿里-DataX源碼解讀匯總
Datax3.0插件加載流程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM