Datax 数据传输源码简单分析


数据传输大致过程:

datax采用插件模式设计,reader与reade之间,reader与writer之间完全解耦,可做到互不影响。datax有三大部分,reader,writer,channel,reader和writer间依赖channel传输数据,reader通过recordSender.sendtoWriter()往channel写入数据,writer通过recordReceiver.getFromReader()从channel拉取数据,channel的底层是一个队列,先进先出(ArrayBlockingQueue)。

作业运行设计:

Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。

Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。

TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup

JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker。

TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。

重点源码:

下面以MySQL为例,进行源代码分析

读取过程源码分析

 1 // MySQL 读取 源代码
 2 @Override
 3 public void startRead(RecordSender recordSender) {
 4     int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
 5 
 6     this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
 7             super.getTaskPluginCollector(), fetchSize);
 8 }
 9 
10 // 调用 rdbms 的 startRead 源代码,重点看 transportOneRecord 方法
11 public void startRead(Configuration readerSliceConfig,
12                       RecordSender recordSender,
13                       TaskPluginCollector taskPluginCollector, int fetchSize) {
14     ......
15         while (rs.next()) {
16             if (limit != null && i > limit) {
17                 break;
18             }
19             rsNextUsedTime += (System.nanoTime() - lastTime);
20             this.transportOneRecord(recordSender, rs,
21                     metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
22             lastTime = System.nanoTime();
23             i++;
24         }
25 
26         allResultPerfRecord.end(rsNextUsedTime);
27         LOG.info("Finished read record by Sql: [{}\n] {}.",
28                 querySql, basicMsg);
29     ......
30 }
31 
32 // 发送到 channel 队列中
33 protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
34         ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
35         TaskPluginCollector taskPluginCollector) {
36     Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
37     recordSender.sendToWriter(record);
38     return record;
39 }

读取到队列,以及出队列 和 队列 channel 设计 源码

 1 // 从reader中读取到数据 源代码
 2 @Override
 3 public void sendToWriter(Record record) {
 4     if(shutdown){
 5         throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
 6     }
 7 
 8     Validate.notNull(record, "record不能为空.");
 9 
10     if (record.getMemorySize() > this.byteCapacity) {
11         this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
12         return;
13     }
14 
15     boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
16     if (isFull) {
17         flush();
18     }
19 
20     this.buffer.add(record);
21     this.bufferIndex++;
22     memoryBytes.addAndGet(record.getMemorySize());
23 }
24 
25 //  reader 读取时 调用 flush
26 @Override
27 public void sendToWriter(Record record) {
28     if(shutdown){
29         throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
30     }
31 
32     Validate.notNull(record, "record不能为空.");
33 
34     if (record.getMemorySize() > this.byteCapacity) {
35         this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
36         return;
37     }
38 
39     boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
40     if (isFull) {
41         flush();
42     }
43 
44     this.buffer.add(record);
45     this.bufferIndex++;
46     memoryBytes.addAndGet(record.getMemorySize());
47 }
48 
49 // 写数据源代码 ,获取之前reader读取的数据 
50 @Override
51 public Record getFromReader() {
52     if(shutdown){
53         throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
54     }
55     boolean isEmpty = (this.bufferIndex >= this.buffer.size());
56     if (isEmpty) {
57         receive();
58     }
59 
60     Record record = this.buffer.get(this.bufferIndex++);
61     if (record instanceof TerminateRecord) {
62         record = null;
63     }
64     return record;
65 }
66 
67 // getFromReader 调用 receive 方法
68 private void receive() {
69     this.channel.pullAll(this.buffer);
70     this.bufferIndex = 0;
71     this.bufferSize = this.buffer.size();
72 }
73 
74 
75 // 内存Channel的具体实现,底层其实是一个ArrayBlockingQueue
76 
77 public class MemoryChannel extends Channel {
78 
79     private int bufferSize = 0;
80 
81     private AtomicInteger memoryBytes = new AtomicInteger(0);
82 
83     private ArrayBlockingQueue<Record> queue = null;
84     ......
85 }

写入过程源代码分析

 1 // MySQL 写入 源代码
 2 public void startWrite(RecordReceiver recordReceiver) {
 3     this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
 4             super.getTaskPluginCollector());
 5 }
 6 
 7 // rdbms 的 startWrite方法 中的 startWriteWithConnection 子方法源代码如下
 8 public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
 9    ......
10     List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
11     int bufferBytes = 0;   
12     try {
13         Record record;
14         while ((record = recordReceiver.getFromReader()) != null) {
15             if (record.getColumnNumber() != this.columnNumber) {
16                 // 源头读取字段列数与目的表字段写入列数不相等,直接报错
17                 throw DataXException
18                         .asDataXException(
19                                 DBUtilErrorCode.CONF_ERROR,
20                                 String.format(
21                                         "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
22                                         record.getColumnNumber(),
23                                         this.columnNumber));
24             }
25 
26             writeBuffer.add(record);
27             bufferBytes += record.getMemorySize();
28 
29             if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
30                 doBatchInsert(connection, writeBuffer);
31                 writeBuffer.clear();
32                 bufferBytes = 0;
33             }
34         }
35         if (!writeBuffer.isEmpty()) {
36             doBatchInsert(connection, writeBuffer);
37             writeBuffer.clear();
38             bufferBytes = 0;
39         }
40     } catch (Exception e) {
41         throw DataXException.asDataXException(
42                 DBUtilErrorCode.WRITE_DATA_ERROR, e);
43     } finally {
44         writeBuffer.clear();
45         bufferBytes = 0;
46         DBUtil.closeDBResources(null, null, connection);
47     }
48    ......
49 }
50  // record = recordReceiver.getFromReader() 获取record 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM