Datax 數據傳輸源碼簡單分析


數據傳輸大致過程:

datax采用插件模式設計,reader與reade之間,reader與writer之間完全解耦,可做到互不影響。datax有三大部分,reader,writer,channel,reader和writer間依賴channel傳輸數據,reader通過recordSender.sendtoWriter()往channel寫入數據,writer通過recordReceiver.getFromReader()從channel拉取數據,channel的底層是一個隊列,先進先出(ArrayBlockingQueue)。

作業運行設計:

Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX數據同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。

Task: Task是為最大化而把Job拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個並發執行。

TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執行下的Task集合稱之為TaskGroup

JobContainer: Job執行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker。

TaskGroupContainer: TaskGroup執行器,負責執行一組Task的工作單元,類似Yarn中的TaskTracker。

重點源碼:

下面以MySQL為例,進行源代碼分析

讀取過程源碼分析

 1 // MySQL 讀取 源代碼
 2 @Override
 3 public void startRead(RecordSender recordSender) {
 4     int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
 5 
 6     this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
 7             super.getTaskPluginCollector(), fetchSize);
 8 }
 9 
10 // 調用 rdbms 的 startRead 源代碼,重點看 transportOneRecord 方法
11 public void startRead(Configuration readerSliceConfig,
12                       RecordSender recordSender,
13                       TaskPluginCollector taskPluginCollector, int fetchSize) {
14     ......
15         while (rs.next()) {
16             if (limit != null && i > limit) {
17                 break;
18             }
19             rsNextUsedTime += (System.nanoTime() - lastTime);
20             this.transportOneRecord(recordSender, rs,
21                     metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
22             lastTime = System.nanoTime();
23             i++;
24         }
25 
26         allResultPerfRecord.end(rsNextUsedTime);
27         LOG.info("Finished read record by Sql: [{}\n] {}.",
28                 querySql, basicMsg);
29     ......
30 }
31 
32 // 發送到 channel 隊列中
33 protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
34         ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
35         TaskPluginCollector taskPluginCollector) {
36     Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
37     recordSender.sendToWriter(record);
38     return record;
39 }

讀取到隊列,以及出隊列 和 隊列 channel 設計 源碼

 1 // 從reader中讀取到數據 源代碼
 2 @Override
 3 public void sendToWriter(Record record) {
 4     if(shutdown){
 5         throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
 6     }
 7 
 8     Validate.notNull(record, "record不能為空.");
 9 
10     if (record.getMemorySize() > this.byteCapacity) {
11         this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("單條記錄超過大小限制,當前限制為:%s", this.byteCapacity)));
12         return;
13     }
14 
15     boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
16     if (isFull) {
17         flush();
18     }
19 
20     this.buffer.add(record);
21     this.bufferIndex++;
22     memoryBytes.addAndGet(record.getMemorySize());
23 }
24 
25 //  reader 讀取時 調用 flush
26 @Override
27 public void sendToWriter(Record record) {
28     if(shutdown){
29         throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
30     }
31 
32     Validate.notNull(record, "record不能為空.");
33 
34     if (record.getMemorySize() > this.byteCapacity) {
35         this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("單條記錄超過大小限制,當前限制為:%s", this.byteCapacity)));
36         return;
37     }
38 
39     boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
40     if (isFull) {
41         flush();
42     }
43 
44     this.buffer.add(record);
45     this.bufferIndex++;
46     memoryBytes.addAndGet(record.getMemorySize());
47 }
48 
49 // 寫數據源代碼 ,獲取之前reader讀取的數據 
50 @Override
51 public Record getFromReader() {
52     if(shutdown){
53         throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
54     }
55     boolean isEmpty = (this.bufferIndex >= this.buffer.size());
56     if (isEmpty) {
57         receive();
58     }
59 
60     Record record = this.buffer.get(this.bufferIndex++);
61     if (record instanceof TerminateRecord) {
62         record = null;
63     }
64     return record;
65 }
66 
67 // getFromReader 調用 receive 方法
68 private void receive() {
69     this.channel.pullAll(this.buffer);
70     this.bufferIndex = 0;
71     this.bufferSize = this.buffer.size();
72 }
73 
74 
75 // 內存Channel的具體實現,底層其實是一個ArrayBlockingQueue
76 
77 public class MemoryChannel extends Channel {
78 
79     private int bufferSize = 0;
80 
81     private AtomicInteger memoryBytes = new AtomicInteger(0);
82 
83     private ArrayBlockingQueue<Record> queue = null;
84     ......
85 }

寫入過程源代碼分析

 1 // MySQL 寫入 源代碼
 2 public void startWrite(RecordReceiver recordReceiver) {
 3     this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
 4             super.getTaskPluginCollector());
 5 }
 6 
 7 // rdbms 的 startWrite方法 中的 startWriteWithConnection 子方法源代碼如下
 8 public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
 9    ......
10     List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
11     int bufferBytes = 0;   
12     try {
13         Record record;
14         while ((record = recordReceiver.getFromReader()) != null) {
15             if (record.getColumnNumber() != this.columnNumber) {
16                 // 源頭讀取字段列數與目的表字段寫入列數不相等,直接報錯
17                 throw DataXException
18                         .asDataXException(
19                                 DBUtilErrorCode.CONF_ERROR,
20                                 String.format(
21                                         "列配置信息有錯誤. 因為您配置的任務中,源頭讀取字段數:%s 與 目的表要寫入的字段數:%s 不相等. 請檢查您的配置並作出修改.",
22                                         record.getColumnNumber(),
23                                         this.columnNumber));
24             }
25 
26             writeBuffer.add(record);
27             bufferBytes += record.getMemorySize();
28 
29             if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
30                 doBatchInsert(connection, writeBuffer);
31                 writeBuffer.clear();
32                 bufferBytes = 0;
33             }
34         }
35         if (!writeBuffer.isEmpty()) {
36             doBatchInsert(connection, writeBuffer);
37             writeBuffer.clear();
38             bufferBytes = 0;
39         }
40     } catch (Exception e) {
41         throw DataXException.asDataXException(
42                 DBUtilErrorCode.WRITE_DATA_ERROR, e);
43     } finally {
44         writeBuffer.clear();
45         bufferBytes = 0;
46         DBUtil.closeDBResources(null, null, connection);
47     }
48    ......
49 }
50  // record = recordReceiver.getFromReader() 獲取record 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM