數據傳輸大致過程:
datax采用插件模式設計,reader與reade之間,reader與writer之間完全解耦,可做到互不影響。datax有三大部分,reader,writer,channel,reader和writer間依賴channel傳輸數據,reader通過recordSender.sendtoWriter()往channel寫入數據,writer通過recordReceiver.getFromReader()從channel拉取數據,channel的底層是一個隊列,先進先出(ArrayBlockingQueue)。
作業運行設計:
Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。
Task: Task是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個並發執行。
TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup
JobContainer: Job執行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker。
TaskGroupContainer: TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TaskTracker。
重點源碼:
下面以MySQL為例,進行源代碼分析
讀取過程源碼分析
1 // MySQL 讀取 源代碼 2 @Override 3 public void startRead(RecordSender recordSender) { 4 int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE); 5 6 this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, 7 super.getTaskPluginCollector(), fetchSize); 8 } 9 10 // 調用 rdbms 的 startRead 源代碼,重點看 transportOneRecord 方法 11 public void startRead(Configuration readerSliceConfig, 12 RecordSender recordSender, 13 TaskPluginCollector taskPluginCollector, int fetchSize) { 14 ...... 15 while (rs.next()) { 16 if (limit != null && i > limit) { 17 break; 18 } 19 rsNextUsedTime += (System.nanoTime() - lastTime); 20 this.transportOneRecord(recordSender, rs, 21 metaData, columnNumber, mandatoryEncoding, taskPluginCollector); 22 lastTime = System.nanoTime(); 23 i++; 24 } 25 26 allResultPerfRecord.end(rsNextUsedTime); 27 LOG.info("Finished read record by Sql: [{}\n] {}.", 28 querySql, basicMsg); 29 ...... 30 } 31 32 // 發送到 channel 隊列中 33 protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 34 ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 35 TaskPluginCollector taskPluginCollector) { 36 Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 37 recordSender.sendToWriter(record); 38 return record; 39 }
讀取到隊列,以及出隊列 和 隊列 channel 設計 源碼
1 // 從reader中讀取到數據 源代碼 2 @Override 3 public void sendToWriter(Record record) { 4 if(shutdown){ 5 throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); 6 } 7 8 Validate.notNull(record, "record不能為空."); 9 10 if (record.getMemorySize() > this.byteCapacity) { 11 this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("單條記錄超過大小限制,當前限制為:%s", this.byteCapacity))); 12 return; 13 } 14 15 boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity); 16 if (isFull) { 17 flush(); 18 } 19 20 this.buffer.add(record); 21 this.bufferIndex++; 22 memoryBytes.addAndGet(record.getMemorySize()); 23 } 24 25 // reader 讀取時 調用 flush 26 @Override 27 public void sendToWriter(Record record) { 28 if(shutdown){ 29 throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); 30 } 31 32 Validate.notNull(record, "record不能為空."); 33 34 if (record.getMemorySize() > this.byteCapacity) { 35 this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("單條記錄超過大小限制,當前限制為:%s", this.byteCapacity))); 36 return; 37 } 38 39 boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity); 40 if (isFull) { 41 flush(); 42 } 43 44 this.buffer.add(record); 45 this.bufferIndex++; 46 memoryBytes.addAndGet(record.getMemorySize()); 47 } 48 49 // 寫數據源代碼 ,獲取之前reader讀取的數據 50 @Override 51 public Record getFromReader() { 52 if(shutdown){ 53 throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); 54 } 55 boolean isEmpty = (this.bufferIndex >= this.buffer.size()); 56 if (isEmpty) { 57 receive(); 58 } 59 60 Record record = this.buffer.get(this.bufferIndex++); 61 if (record instanceof TerminateRecord) { 62 record = null; 63 } 64 return record; 65 } 66 67 // getFromReader 調用 receive 方法 68 private void receive() { 69 this.channel.pullAll(this.buffer); 70 this.bufferIndex = 0; 71 this.bufferSize = this.buffer.size(); 72 } 73 74 75 // 內存Channel的具體實現,底層其實是一個ArrayBlockingQueue 76 77 public class MemoryChannel extends Channel { 78 79 private int bufferSize = 0; 80 81 private AtomicInteger memoryBytes = new AtomicInteger(0); 82 83 private ArrayBlockingQueue<Record> queue = null; 84 ...... 85 }
寫入過程源代碼分析
1 // MySQL 寫入 源代碼 2 public void startWrite(RecordReceiver recordReceiver) { 3 this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig, 4 super.getTaskPluginCollector()); 5 } 6 7 // rdbms 的 startWrite方法 中的 startWriteWithConnection 子方法源代碼如下 8 public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { 9 ...... 10 List<Record> writeBuffer = new ArrayList<Record>(this.batchSize); 11 int bufferBytes = 0; 12 try { 13 Record record; 14 while ((record = recordReceiver.getFromReader()) != null) { 15 if (record.getColumnNumber() != this.columnNumber) { 16 // 源頭讀取字段列數與目的表字段寫入列數不相等,直接報錯 17 throw DataXException 18 .asDataXException( 19 DBUtilErrorCode.CONF_ERROR, 20 String.format( 21 "列配置信息有錯誤. 因為您配置的任務中,源頭讀取字段數:%s 與 目的表要寫入的字段數:%s 不相等. 請檢查您的配置並作出修改.", 22 record.getColumnNumber(), 23 this.columnNumber)); 24 } 25 26 writeBuffer.add(record); 27 bufferBytes += record.getMemorySize(); 28 29 if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) { 30 doBatchInsert(connection, writeBuffer); 31 writeBuffer.clear(); 32 bufferBytes = 0; 33 } 34 } 35 if (!writeBuffer.isEmpty()) { 36 doBatchInsert(connection, writeBuffer); 37 writeBuffer.clear(); 38 bufferBytes = 0; 39 } 40 } catch (Exception e) { 41 throw DataXException.asDataXException( 42 DBUtilErrorCode.WRITE_DATA_ERROR, e); 43 } finally { 44 writeBuffer.clear(); 45 bufferBytes = 0; 46 DBUtil.closeDBResources(null, null, connection); 47 } 48 ...... 49 } 50 // record = recordReceiver.getFromReader() 獲取record