DataX學習指南(二)--插件開發


DataX為什么采用插件機制?  

  從設計之初,DataX就把異構數據源同步作為自身的使命,為了應對不同數據源的差異、同時提供一致的同步原語和擴展能力,DataX自然而然地采用了框架 + 插件 的模式:

  • 插件只需關心數據的讀取或者寫入本身。
  • 而同步的共性問題,比如:類型轉換、性能、統計,則交由框架來處理。

  作為插件開發人員,則需要關注兩個問題:

  1. 數據源本身的讀寫數據正確性。
  2. 如何與框架溝通、合理正確地使用框架。

插件是如何加載的?

  框架是怎么找到插件的入口類的?框架是如何加載插件的呢?

  在每個插件的項目中,都有一個plugin.json文件,這個文件定義了插件的相關信息,包括入口類。例如:

{
    "name": "mysqlwriter", "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter", "description": "Use Jdbc connect to database, execute insert sql.", "developer": "alibaba" }
  • name: 插件名稱,大小寫敏感。框架根據用戶在配置文件中指定的名稱來搜尋插件。 十分重要 。
  • class: 入口類的全限定名稱,框架通過反射插件入口類的實例。十分重要 。
  • description: 描述信息。
  • developer: 開發人員。

如何編寫插件?

  插件開發者不用關心太多,基本只需要關注特定系統讀和寫,以及自己的代碼在邏輯上是怎樣被執行的,哪一個方法是在什么時候被調用的。在此之前,需要明確以下概念:

  • JobJob是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。
  • TaskTask是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個並發執行。
  • TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup
  • JobContainerJob執行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker
  • TaskGroupContainerTaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TaskTracker。

  簡而言之, Job拆分成Task,在分別在框架提供的容器中執行,插件只需要實現JobTask兩部分邏輯。(框架體系介紹見DataX學習指南(一)--基礎介紹

定制插件開發(二次開發)

  在進行二次開發之前,需要到GitHub上下載源碼。開發前將datax的開發插件的手冊認真觀看一遍,對開發有幫助的。地址:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

  下載完后,通過ideaJ打開,目錄結構如下(標准的分布式包結構),然后就是漫長的導包過程。

  

 

   通過上面的目錄結構,每類數據源都存在一對Readere和Writer。這些是開源項目已經寫好的,如我們需要二次開發,最方便的就是借鑒已有的業務邏輯,然后根據時間需求定制開發reader/writer(並非每次都得同時開發reader/writer)。

   新建一個模塊,比如我這叫testreader,然后從其他模塊復制plugin.json和plugin_job_template.json這兩個文件,並做對應修改。(我這里reader實現的是從文本讀取內容)

  

 

   然后就是引入pom.xml文件,可以將任意一個項目的pom文件復制過來,然后根據自己的時間使用情況,修改/引入自己所需的依賴文件。最后將本模塊加入到datax的父pom文件。

  Reader/Job內方法的介紹在插件開發手冊介紹得很詳細了,這里我就不累述了,直接貼上開發的TestReader 

package com.alibaba.datax.plugin.reader.testreader;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.google.common.collect.Sets;

import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

/**
 * jackpot
 */
public class TestReader extends Reader {
    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration originConfig = null;

        private List<String> path = null;

        private List<String> sourceFiles;

        private Map<String, Pattern> pattern;

        private Map<String, Boolean> isRegexPath;

        @Override
        public void init() {
            this.originConfig = this.getPluginJobConf();
            this.pattern = new HashMap<String, Pattern>();
            this.isRegexPath = new HashMap<String, Boolean>();
            this.validateParameter();
        }

        private void validateParameter() {
            // Compatible with the old version, path is a string before
            String pathInString = this.originConfig.getNecessaryValue(Key.PATH,
                    TxtFileReaderErrorCode.REQUIRED_VALUE);
            if (StringUtils.isBlank(pathInString)) {
                throw DataXException.asDataXException(
                        TxtFileReaderErrorCode.REQUIRED_VALUE,
                        "您需要指定待讀取的源目錄或文件");
            }
            if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) {
                path = new ArrayList<String>();
                path.add(pathInString);
            } else {
                path = this.originConfig.getList(Key.PATH, String.class);
                if (null == path || path.size() == 0) {
                    throw DataXException.asDataXException(
                            TxtFileReaderErrorCode.REQUIRED_VALUE,
                            "您需要指定待讀取的源目錄或文件");
                }
            }

            String encoding = this.originConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                            com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
            if (StringUtils.isBlank(encoding)) {
                this.originConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                                com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
            } else {
                try {
                    encoding = encoding.trim();
                    this.originConfig
                            .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                                    encoding);
                    Charsets.toCharset(encoding);
                } catch (UnsupportedCharsetException uce) {
                    throw DataXException.asDataXException(
                            TxtFileReaderErrorCode.ILLEGAL_VALUE,
                            String.format("不支持您配置的編碼格式 : [%s]", encoding), uce);
                } catch (Exception e) {
                    throw DataXException.asDataXException(
                            TxtFileReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                            String.format("編碼配置異常, 請聯系我們: %s", e.getMessage()),
                            e);
                }
            }

            // column: 1. index type 2.value type 3.when type is Date, may have
            // format
            List<Configuration> columns = this.originConfig
                    .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
            // handle ["*"]
            if (null != columns && 1 == columns.size()) {
                String columnsInStr = columns.get(0).toString();
                if ("\"*\"".equals(columnsInStr) || "'*'".equals(columnsInStr)) {
                    this.originConfig
                            .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN,
                                    null);
                    columns = null;
                }
            }

            if (null != columns && columns.size() != 0) {
                for (Configuration eachColumnConf : columns) {
                    eachColumnConf
                            .getNecessaryValue(
                                    com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE,
                                    TxtFileReaderErrorCode.REQUIRED_VALUE);
                    Integer columnIndex = eachColumnConf
                            .getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.INDEX);
                    String columnValue = eachColumnConf
                            .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.VALUE);

                    if (null == columnIndex && null == columnValue) {
                        throw DataXException.asDataXException(
                                TxtFileReaderErrorCode.NO_INDEX_VALUE,
                                "由於您配置了type, 則至少需要配置 index 或 value");
                    }

                    if (null != columnIndex && null != columnValue) {
                        throw DataXException.asDataXException(
                                TxtFileReaderErrorCode.MIXED_INDEX_VALUE,
                                "您混合配置了index, value, 每一列同時僅能選擇其中一種");
                    }
                    if (null != columnIndex && columnIndex < 0) {
                        throw DataXException.asDataXException(
                                TxtFileReaderErrorCode.ILLEGAL_VALUE, String
                                        .format("index需要大於等於0, 您配置的index為[%s]",
                                                columnIndex));
                    }
                }
            }

            // only support compress types
            String compress = this.originConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS);
            if (StringUtils.isBlank(compress)) {
                this.originConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
                                null);
            } else {
                Set<String> supportedCompress = Sets
                        .newHashSet("gzip", "bzip2", "zip");
                compress = compress.toLowerCase().trim();
                if (!supportedCompress.contains(compress)) {
                    throw DataXException
                            .asDataXException(
                                    TxtFileReaderErrorCode.ILLEGAL_VALUE,
                                    String.format(
                                            "僅支持 gzip, bzip2, zip 文件壓縮格式 , 不支持您配置的文件壓縮格式: [%s]",
                                            compress));
                }
                this.originConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
                                compress);
            }

            String delimiterInStr = this.originConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.FIELD_DELIMITER);
            // warn: if have, length must be one
            if (null != delimiterInStr && 1 != delimiterInStr.length()) {
                throw DataXException.asDataXException(
                        UnstructuredStorageReaderErrorCode.ILLEGAL_VALUE,
                        String.format("僅僅支持單字符切分, 您配置的切分為 : [%s]",
                                delimiterInStr));
            }

        }

        @Override
        public void prepare() {
            LOG.debug("prepare() begin...");
            // warn:make sure this regex string
            // warn:no need trim
            for (String eachPath : this.path) {
                String regexString = eachPath.replace("*", ".*").replace("?",
                        ".?");
                Pattern patt = Pattern.compile(regexString);
                this.pattern.put(eachPath, patt);
                this.sourceFiles = this.buildSourceTargets();
            }

            LOG.info(String.format("您即將讀取的文件數為: [%s]", this.sourceFiles.size()));
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }

        // warn: 如果源目錄為空會報錯,拖空目錄意圖=>空文件顯示指定此意圖
        @Override
        public List<Configuration> split(int adviceNumber) {
            LOG.debug("split() begin...");
            List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();

            // warn:每個slice拖且僅拖一個文件,
            // int splitNumber = adviceNumber;
            int splitNumber = this.sourceFiles.size();
            if (0 == splitNumber) {
                throw DataXException.asDataXException(
                        TxtFileReaderErrorCode.EMPTY_DIR_EXCEPTION, String
                                .format("未能找到待讀取的文件,請確認您的配置項path: %s",
                                        this.originConfig.getString(Key.PATH)));
            }

            List<List<String>> splitedSourceFiles = this.splitSourceFiles(
                    this.sourceFiles, splitNumber);
            for (List<String> files : splitedSourceFiles) {
                Configuration splitedConfig = this.originConfig.clone();
                splitedConfig.set(Constant.SOURCE_FILES, files);
                readerSplitConfigs.add(splitedConfig);
            }
            LOG.debug("split() ok and end...");
            return readerSplitConfigs;
        }

        // validate the path, path must be a absolute path
        private List<String> buildSourceTargets() {
            // for eath path
            Set<String> toBeReadFiles = new HashSet<String>();
            for (String eachPath : this.path) {
                int endMark;
                for (endMark = 0; endMark < eachPath.length(); endMark++) {
                    if ('*' != eachPath.charAt(endMark)
                            && '?' != eachPath.charAt(endMark)) {
                        continue;
                    } else {
                        this.isRegexPath.put(eachPath, true);
                        break;
                    }
                }

                String parentDirectory;
                if (BooleanUtils.isTrue(this.isRegexPath.get(eachPath))) {
                    int lastDirSeparator = eachPath.substring(0, endMark)
                            .lastIndexOf(IOUtils.DIR_SEPARATOR);
                    parentDirectory = eachPath.substring(0,
                            lastDirSeparator + 1);
                } else {
                    this.isRegexPath.put(eachPath, false);
                    parentDirectory = eachPath;
                }
                this.buildSourceTargetsEathPath(eachPath, parentDirectory,
                        toBeReadFiles);
            }
            return Arrays.asList(toBeReadFiles.toArray(new String[0]));
        }

        private void buildSourceTargetsEathPath(String regexPath,
                String parentDirectory, Set<String> toBeReadFiles) {
            // 檢測目錄是否存在,錯誤情況更明確
            try {
                File dir = new File(parentDirectory);
                boolean isExists = dir.exists();
                if (!isExists) {
                    String message = String.format("您設定的目錄不存在 : [%s]",
                            parentDirectory);
                    LOG.error(message);
                    throw DataXException.asDataXException(
                            TxtFileReaderErrorCode.FILE_NOT_EXISTS, message);
                }
            } catch (SecurityException se) {
                String message = String.format("您沒有權限查看目錄 : [%s]",
                        parentDirectory);
                LOG.error(message);
                throw DataXException.asDataXException(
                        TxtFileReaderErrorCode.SECURITY_NOT_ENOUGH, message);
            }

            directoryRover(regexPath, parentDirectory, toBeReadFiles);
        }

        private void directoryRover(String regexPath, String parentDirectory,
                Set<String> toBeReadFiles) {
            File directory = new File(parentDirectory);
            // is a normal file
            if (!directory.isDirectory()) {
                if (this.isTargetFile(regexPath, directory.getAbsolutePath())) {
                    toBeReadFiles.add(parentDirectory);
                    LOG.info(String.format(
                            "add file [%s] as a candidate to be read.",
                            parentDirectory));

                }
            } else {
                // 是目錄
                try {
                    // warn:對於沒有權限的目錄,listFiles 返回null,而不是拋出SecurityException
                    File[] files = directory.listFiles();
                    if (null != files) {
                        for (File subFileNames : files) {
                            directoryRover(regexPath,
                                    subFileNames.getAbsolutePath(),
                                    toBeReadFiles);
                        }
                    } else {
                        // warn: 對於沒有權限的文件,是直接throw DataXException
                        String message = String.format("您沒有權限查看目錄 : [%s]",
                                directory);
                        LOG.error(message);
                        throw DataXException.asDataXException(
                                TxtFileReaderErrorCode.SECURITY_NOT_ENOUGH,
                                message);
                    }

                } catch (SecurityException e) {
                    String message = String.format("您沒有權限查看目錄 : [%s]",
                            directory);
                    LOG.error(message);
                    throw DataXException.asDataXException(
                            TxtFileReaderErrorCode.SECURITY_NOT_ENOUGH,
                            message, e);
                }
            }
        }

        // 正則過濾
        private boolean isTargetFile(String regexPath, String absoluteFilePath) {
            if (this.isRegexPath.get(regexPath)) {
                return this.pattern.get(regexPath).matcher(absoluteFilePath)
                        .matches();
            } else {
                return true;
            }

        }

        private <T> List<List<T>> splitSourceFiles(final List<T> sourceList,
                int adviceNumber) {
            List<List<T>> splitedList = new ArrayList<List<T>>();
            int averageLength = sourceList.size() / adviceNumber;
            averageLength = averageLength == 0 ? 1 : averageLength;

            for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) {
                end = begin + averageLength;
                if (end > sourceList.size()) {
                    end = sourceList.size();
                }
                splitedList.add(sourceList.subList(begin, end));
            }
            return splitedList;
        }

    }

    public static class Task extends Reader.Task {
        private static Logger LOG = LoggerFactory.getLogger(Task.class);

        private Configuration readerSliceConfig;
        private List<String> sourceFiles;

        @Override
        public void init() {
            this.readerSliceConfig = this.getPluginJobConf();
            this.sourceFiles = this.readerSliceConfig.getList(
                    Constant.SOURCE_FILES, String.class);
        }

        @Override
        public void prepare() {

        }

        @Override
        public void post() {

        }

        @Override
        public void destroy() {

        }

        @Override
        public void startRead(RecordSender recordSender) {
            LOG.debug("start read source files...");
            for (String fileName : this.sourceFiles) {
                LOG.info(String.format("reading file : [%s]", fileName));
                InputStream inputStream;
                try {
                    inputStream = new FileInputStream(fileName);
                    UnstructuredStorageReaderUtil.readFromStream(inputStream,
                            fileName, this.readerSliceConfig, recordSender,
                            this.getTaskPluginCollector());
                    recordSender.flush();
                } catch (FileNotFoundException e) {
                    // warn: sock 文件無法read,能影響所有文件的傳輸,需要用戶自己保證
                    String message = String
                            .format("找不到待讀取的文件 : [%s]", fileName);
                    LOG.error(message);
                    throw DataXException.asDataXException(
                            TxtFileReaderErrorCode.OPEN_FILE_ERROR, message);
                }
            }
            LOG.debug("end read source files...");
        }

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM