DataX為什么采用插件機制?
從設計之初,DataX
就把異構數據源同步作為自身的使命,為了應對不同數據源的差異、同時提供一致的同步原語和擴展能力,DataX
自然而然地采用了框架
+ 插件
的模式:
- 插件只需關心數據的讀取或者寫入本身。
- 而同步的共性問題,比如:類型轉換、性能、統計,則交由框架來處理。
作為插件開發人員,則需要關注兩個問題:
- 數據源本身的讀寫數據正確性。
- 如何與框架溝通、合理正確地使用框架。
插件是如何加載的?
框架是怎么找到插件的入口類的?框架是如何加載插件的呢?
在每個插件的項目中,都有一個plugin.json
文件,這個文件定義了插件的相關信息,包括入口類。例如:
{
"name": "mysqlwriter", "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter", "description": "Use Jdbc connect to database, execute insert sql.", "developer": "alibaba" }
name
: 插件名稱,大小寫敏感。框架根據用戶在配置文件中指定的名稱來搜尋插件。 十分重要 。class
: 入口類的全限定名稱,框架通過反射插件入口類的實例。十分重要 。description
: 描述信息。developer
: 開發人員。
如何編寫插件?
插件開發者不用關心太多,基本只需要關注特定系統讀和寫,以及自己的代碼在邏輯上是怎樣被執行的,哪一個方法是在什么時候被調用的。在此之前,需要明確以下概念:
Job
:Job
是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。Task
:Task
是為最大化而把Job
拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job
,拆分成1024個讀Task
,用若干個並發執行。TaskGroup
: 描述的是一組Task
集合。在同一個TaskGroupContainer
執行下的Task
集合稱之為TaskGroup
JobContainer
:Job
執行器,負責Job
全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTrackerTaskGroupContainer
:TaskGroup
執行器,負責執行一組Task
的工作單元,類似Yarn中的TaskTracker。
簡而言之, Job
拆分成Task
,在分別在框架提供的容器中執行,插件只需要實現Job
和Task
兩部分邏輯。(框架體系介紹見DataX學習指南(一)--基礎介紹)
定制插件開發(二次開發)
在進行二次開發之前,需要到GitHub上下載源碼。開發前將datax的開發插件的手冊認真觀看一遍,對開發有幫助的。地址:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
下載完后,通過ideaJ打開,目錄結構如下(標准的分布式包結構),然后就是漫長的導包過程。
通過上面的目錄結構,每類數據源都存在一對Readere和Writer。這些是開源項目已經寫好的,如我們需要二次開發,最方便的就是借鑒已有的業務邏輯,然后根據時間需求定制開發reader/writer(並非每次都得同時開發reader/writer)。
新建一個模塊,比如我這叫testreader,然后從其他模塊復制plugin.json和plugin_job_template.json這兩個文件,並做對應修改。(我這里reader實現的是從文本讀取內容)
然后就是引入pom.xml文件,可以將任意一個項目的pom文件復制過來,然后根據自己的時間使用情況,修改/引入自己所需的依賴文件。最后將本模塊加入到datax的父pom文件。
Reader/Job內方法的介紹在插件開發手冊介紹得很詳細了,這里我就不累述了,直接貼上開發的TestReader
package com.alibaba.datax.plugin.reader.testreader; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode; import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil; import com.google.common.collect.Sets; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; /** * jackpot */ public class TestReader extends Reader { public static class Job extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originConfig = null; private List<String> path = null; private List<String> sourceFiles; private Map<String, Pattern> pattern; private Map<String, Boolean> isRegexPath; @Override public void init() { this.originConfig = this.getPluginJobConf(); this.pattern = new HashMap<String, Pattern>(); this.isRegexPath = new HashMap<String, Boolean>(); this.validateParameter(); } private void validateParameter() { // Compatible with the old version, path is a string before String pathInString = this.originConfig.getNecessaryValue(Key.PATH, TxtFileReaderErrorCode.REQUIRED_VALUE); if (StringUtils.isBlank(pathInString)) { throw DataXException.asDataXException( TxtFileReaderErrorCode.REQUIRED_VALUE, "您需要指定待讀取的源目錄或文件"); } if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) { path = new ArrayList<String>(); path.add(pathInString); } else { path = this.originConfig.getList(Key.PATH, String.class); if (null == path || path.size() == 0) { throw DataXException.asDataXException( TxtFileReaderErrorCode.REQUIRED_VALUE, "您需要指定待讀取的源目錄或文件"); } } String encoding = this.originConfig .getString( com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING, com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING); if (StringUtils.isBlank(encoding)) { this.originConfig .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING, com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING); } else { try { encoding = encoding.trim(); this.originConfig .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING, encoding); Charsets.toCharset(encoding); } catch (UnsupportedCharsetException uce) { throw DataXException.asDataXException( TxtFileReaderErrorCode.ILLEGAL_VALUE, String.format("不支持您配置的編碼格式 : [%s]", encoding), uce); } catch (Exception e) { throw DataXException.asDataXException( TxtFileReaderErrorCode.CONFIG_INVALID_EXCEPTION, String.format("編碼配置異常, 請聯系我們: %s", e.getMessage()), e); } } // column: 1. index type 2.value type 3.when type is Date, may have // format List<Configuration> columns = this.originConfig .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN); // handle ["*"] if (null != columns && 1 == columns.size()) { String columnsInStr = columns.get(0).toString(); if ("\"*\"".equals(columnsInStr) || "'*'".equals(columnsInStr)) { this.originConfig .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN, null); columns = null; } } if (null != columns && columns.size() != 0) { for (Configuration eachColumnConf : columns) { eachColumnConf .getNecessaryValue( com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE, TxtFileReaderErrorCode.REQUIRED_VALUE); Integer columnIndex = eachColumnConf .getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.INDEX); String columnValue = eachColumnConf .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.VALUE); if (null == columnIndex && null == columnValue) { throw DataXException.asDataXException( TxtFileReaderErrorCode.NO_INDEX_VALUE, "由於您配置了type, 則至少需要配置 index 或 value"); } if (null != columnIndex && null != columnValue) { throw DataXException.asDataXException( TxtFileReaderErrorCode.MIXED_INDEX_VALUE, "您混合配置了index, value, 每一列同時僅能選擇其中一種"); } if (null != columnIndex && columnIndex < 0) { throw DataXException.asDataXException( TxtFileReaderErrorCode.ILLEGAL_VALUE, String .format("index需要大於等於0, 您配置的index為[%s]", columnIndex)); } } } // only support compress types String compress = this.originConfig .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS); if (StringUtils.isBlank(compress)) { this.originConfig .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS, null); } else { Set<String> supportedCompress = Sets .newHashSet("gzip", "bzip2", "zip"); compress = compress.toLowerCase().trim(); if (!supportedCompress.contains(compress)) { throw DataXException .asDataXException( TxtFileReaderErrorCode.ILLEGAL_VALUE, String.format( "僅支持 gzip, bzip2, zip 文件壓縮格式 , 不支持您配置的文件壓縮格式: [%s]", compress)); } this.originConfig .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS, compress); } String delimiterInStr = this.originConfig .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.FIELD_DELIMITER); // warn: if have, length must be one if (null != delimiterInStr && 1 != delimiterInStr.length()) { throw DataXException.asDataXException( UnstructuredStorageReaderErrorCode.ILLEGAL_VALUE, String.format("僅僅支持單字符切分, 您配置的切分為 : [%s]", delimiterInStr)); } } @Override public void prepare() { LOG.debug("prepare() begin..."); // warn:make sure this regex string // warn:no need trim for (String eachPath : this.path) { String regexString = eachPath.replace("*", ".*").replace("?", ".?"); Pattern patt = Pattern.compile(regexString); this.pattern.put(eachPath, patt); this.sourceFiles = this.buildSourceTargets(); } LOG.info(String.format("您即將讀取的文件數為: [%s]", this.sourceFiles.size())); } @Override public void post() { } @Override public void destroy() { } // warn: 如果源目錄為空會報錯,拖空目錄意圖=>空文件顯示指定此意圖 @Override public List<Configuration> split(int adviceNumber) { LOG.debug("split() begin..."); List<Configuration> readerSplitConfigs = new ArrayList<Configuration>(); // warn:每個slice拖且僅拖一個文件, // int splitNumber = adviceNumber; int splitNumber = this.sourceFiles.size(); if (0 == splitNumber) { throw DataXException.asDataXException( TxtFileReaderErrorCode.EMPTY_DIR_EXCEPTION, String .format("未能找到待讀取的文件,請確認您的配置項path: %s", this.originConfig.getString(Key.PATH))); } List<List<String>> splitedSourceFiles = this.splitSourceFiles( this.sourceFiles, splitNumber); for (List<String> files : splitedSourceFiles) { Configuration splitedConfig = this.originConfig.clone(); splitedConfig.set(Constant.SOURCE_FILES, files); readerSplitConfigs.add(splitedConfig); } LOG.debug("split() ok and end..."); return readerSplitConfigs; } // validate the path, path must be a absolute path private List<String> buildSourceTargets() { // for eath path Set<String> toBeReadFiles = new HashSet<String>(); for (String eachPath : this.path) { int endMark; for (endMark = 0; endMark < eachPath.length(); endMark++) { if ('*' != eachPath.charAt(endMark) && '?' != eachPath.charAt(endMark)) { continue; } else { this.isRegexPath.put(eachPath, true); break; } } String parentDirectory; if (BooleanUtils.isTrue(this.isRegexPath.get(eachPath))) { int lastDirSeparator = eachPath.substring(0, endMark) .lastIndexOf(IOUtils.DIR_SEPARATOR); parentDirectory = eachPath.substring(0, lastDirSeparator + 1); } else { this.isRegexPath.put(eachPath, false); parentDirectory = eachPath; } this.buildSourceTargetsEathPath(eachPath, parentDirectory, toBeReadFiles); } return Arrays.asList(toBeReadFiles.toArray(new String[0])); } private void buildSourceTargetsEathPath(String regexPath, String parentDirectory, Set<String> toBeReadFiles) { // 檢測目錄是否存在,錯誤情況更明確 try { File dir = new File(parentDirectory); boolean isExists = dir.exists(); if (!isExists) { String message = String.format("您設定的目錄不存在 : [%s]", parentDirectory); LOG.error(message); throw DataXException.asDataXException( TxtFileReaderErrorCode.FILE_NOT_EXISTS, message); } } catch (SecurityException se) { String message = String.format("您沒有權限查看目錄 : [%s]", parentDirectory); LOG.error(message); throw DataXException.asDataXException( TxtFileReaderErrorCode.SECURITY_NOT_ENOUGH, message); } directoryRover(regexPath, parentDirectory, toBeReadFiles); } private void directoryRover(String regexPath, String parentDirectory, Set<String> toBeReadFiles) { File directory = new File(parentDirectory); // is a normal file if (!directory.isDirectory()) { if (this.isTargetFile(regexPath, directory.getAbsolutePath())) { toBeReadFiles.add(parentDirectory); LOG.info(String.format( "add file [%s] as a candidate to be read.", parentDirectory)); } } else { // 是目錄 try { // warn:對於沒有權限的目錄,listFiles 返回null,而不是拋出SecurityException File[] files = directory.listFiles(); if (null != files) { for (File subFileNames : files) { directoryRover(regexPath, subFileNames.getAbsolutePath(), toBeReadFiles); } } else { // warn: 對於沒有權限的文件,是直接throw DataXException String message = String.format("您沒有權限查看目錄 : [%s]", directory); LOG.error(message); throw DataXException.asDataXException( TxtFileReaderErrorCode.SECURITY_NOT_ENOUGH, message); } } catch (SecurityException e) { String message = String.format("您沒有權限查看目錄 : [%s]", directory); LOG.error(message); throw DataXException.asDataXException( TxtFileReaderErrorCode.SECURITY_NOT_ENOUGH, message, e); } } } // 正則過濾 private boolean isTargetFile(String regexPath, String absoluteFilePath) { if (this.isRegexPath.get(regexPath)) { return this.pattern.get(regexPath).matcher(absoluteFilePath) .matches(); } else { return true; } } private <T> List<List<T>> splitSourceFiles(final List<T> sourceList, int adviceNumber) { List<List<T>> splitedList = new ArrayList<List<T>>(); int averageLength = sourceList.size() / adviceNumber; averageLength = averageLength == 0 ? 1 : averageLength; for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) { end = begin + averageLength; if (end > sourceList.size()) { end = sourceList.size(); } splitedList.add(sourceList.subList(begin, end)); } return splitedList; } } public static class Task extends Reader.Task { private static Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration readerSliceConfig; private List<String> sourceFiles; @Override public void init() { this.readerSliceConfig = this.getPluginJobConf(); this.sourceFiles = this.readerSliceConfig.getList( Constant.SOURCE_FILES, String.class); } @Override public void prepare() { } @Override public void post() { } @Override public void destroy() { } @Override public void startRead(RecordSender recordSender) { LOG.debug("start read source files..."); for (String fileName : this.sourceFiles) { LOG.info(String.format("reading file : [%s]", fileName)); InputStream inputStream; try { inputStream = new FileInputStream(fileName); UnstructuredStorageReaderUtil.readFromStream(inputStream, fileName, this.readerSliceConfig, recordSender, this.getTaskPluginCollector()); recordSender.flush(); } catch (FileNotFoundException e) { // warn: sock 文件無法read,能影響所有文件的傳輸,需要用戶自己保證 String message = String .format("找不到待讀取的文件 : [%s]", fileName); LOG.error(message); throw DataXException.asDataXException( TxtFileReaderErrorCode.OPEN_FILE_ERROR, message); } } LOG.debug("end read source files..."); } } }