数据传输大致过程:
datax采用插件模式设计,reader与reade之间,reader与writer之间完全解耦,可做到互不影响。datax有三大部分,reader,writer,channel,reader和writer间依赖channel传输数据,reader通过recordSender.sendtoWriter()往channel写入数据,writer通过recordReceiver.getFromReader()从channel拉取数据,channel的底层是一个队列,先进先出(ArrayBlockingQueue)。
作业运行设计:
Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。
TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup
JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker。
TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
重点源码:
下面以MySQL为例,进行源代码分析
读取过程源码分析
1 // MySQL 读取 源代码 2 @Override 3 public void startRead(RecordSender recordSender) { 4 int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE); 5 6 this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, 7 super.getTaskPluginCollector(), fetchSize); 8 } 9 10 // 调用 rdbms 的 startRead 源代码,重点看 transportOneRecord 方法 11 public void startRead(Configuration readerSliceConfig, 12 RecordSender recordSender, 13 TaskPluginCollector taskPluginCollector, int fetchSize) { 14 ...... 15 while (rs.next()) { 16 if (limit != null && i > limit) { 17 break; 18 } 19 rsNextUsedTime += (System.nanoTime() - lastTime); 20 this.transportOneRecord(recordSender, rs, 21 metaData, columnNumber, mandatoryEncoding, taskPluginCollector); 22 lastTime = System.nanoTime(); 23 i++; 24 } 25 26 allResultPerfRecord.end(rsNextUsedTime); 27 LOG.info("Finished read record by Sql: [{}\n] {}.", 28 querySql, basicMsg); 29 ...... 30 } 31 32 // 发送到 channel 队列中 33 protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 34 ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 35 TaskPluginCollector taskPluginCollector) { 36 Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 37 recordSender.sendToWriter(record); 38 return record; 39 }
读取到队列,以及出队列 和 队列 channel 设计 源码
1 // 从reader中读取到数据 源代码 2 @Override 3 public void sendToWriter(Record record) { 4 if(shutdown){ 5 throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); 6 } 7 8 Validate.notNull(record, "record不能为空."); 9 10 if (record.getMemorySize() > this.byteCapacity) { 11 this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity))); 12 return; 13 } 14 15 boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity); 16 if (isFull) { 17 flush(); 18 } 19 20 this.buffer.add(record); 21 this.bufferIndex++; 22 memoryBytes.addAndGet(record.getMemorySize()); 23 } 24 25 // reader 读取时 调用 flush 26 @Override 27 public void sendToWriter(Record record) { 28 if(shutdown){ 29 throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); 30 } 31 32 Validate.notNull(record, "record不能为空."); 33 34 if (record.getMemorySize() > this.byteCapacity) { 35 this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity))); 36 return; 37 } 38 39 boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity); 40 if (isFull) { 41 flush(); 42 } 43 44 this.buffer.add(record); 45 this.bufferIndex++; 46 memoryBytes.addAndGet(record.getMemorySize()); 47 } 48 49 // 写数据源代码 ,获取之前reader读取的数据 50 @Override 51 public Record getFromReader() { 52 if(shutdown){ 53 throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); 54 } 55 boolean isEmpty = (this.bufferIndex >= this.buffer.size()); 56 if (isEmpty) { 57 receive(); 58 } 59 60 Record record = this.buffer.get(this.bufferIndex++); 61 if (record instanceof TerminateRecord) { 62 record = null; 63 } 64 return record; 65 } 66 67 // getFromReader 调用 receive 方法 68 private void receive() { 69 this.channel.pullAll(this.buffer); 70 this.bufferIndex = 0; 71 this.bufferSize = this.buffer.size(); 72 } 73 74 75 // 内存Channel的具体实现,底层其实是一个ArrayBlockingQueue 76 77 public class MemoryChannel extends Channel { 78 79 private int bufferSize = 0; 80 81 private AtomicInteger memoryBytes = new AtomicInteger(0); 82 83 private ArrayBlockingQueue<Record> queue = null; 84 ...... 85 }
写入过程源代码分析
1 // MySQL 写入 源代码 2 public void startWrite(RecordReceiver recordReceiver) { 3 this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig, 4 super.getTaskPluginCollector()); 5 } 6 7 // rdbms 的 startWrite方法 中的 startWriteWithConnection 子方法源代码如下 8 public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { 9 ...... 10 List<Record> writeBuffer = new ArrayList<Record>(this.batchSize); 11 int bufferBytes = 0; 12 try { 13 Record record; 14 while ((record = recordReceiver.getFromReader()) != null) { 15 if (record.getColumnNumber() != this.columnNumber) { 16 // 源头读取字段列数与目的表字段写入列数不相等,直接报错 17 throw DataXException 18 .asDataXException( 19 DBUtilErrorCode.CONF_ERROR, 20 String.format( 21 "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", 22 record.getColumnNumber(), 23 this.columnNumber)); 24 } 25 26 writeBuffer.add(record); 27 bufferBytes += record.getMemorySize(); 28 29 if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) { 30 doBatchInsert(connection, writeBuffer); 31 writeBuffer.clear(); 32 bufferBytes = 0; 33 } 34 } 35 if (!writeBuffer.isEmpty()) { 36 doBatchInsert(connection, writeBuffer); 37 writeBuffer.clear(); 38 bufferBytes = 0; 39 } 40 } catch (Exception e) { 41 throw DataXException.asDataXException( 42 DBUtilErrorCode.WRITE_DATA_ERROR, e); 43 } finally { 44 writeBuffer.clear(); 45 bufferBytes = 0; 46 DBUtil.closeDBResources(null, null, connection); 47 } 48 ...... 49 } 50 // record = recordReceiver.getFromReader() 获取record