datax里所有的關系型數據庫都走通用的處理Reader,com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader,當進行split的時候會進行切分獲取channel的個數,
public List<Configuration> split(Configuration originalConfig, int adviceNumber) { return ReaderSplitUtil.doSplit(originalConfig, adviceNumber); }
繼續進入doSplit方法com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil 此處會去判斷是否是table配置的模式
if (isTableMode) { // adviceNumber這里是channel數量大小, 即datax並發task數量 // eachTableShouldSplittedNumber是單表應該切分的份數, 向上取整可能和adviceNumber沒有比例關系了已經 eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber( adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK)); }
如果前面計算的adviceNumber=3 配置了一個table 則每個表分到的channel是3/1=3,
// 說明是配置的 table 方式 if (isTableMode) { // 已在之前進行了擴展和`處理,可以直接使用 List<String> tables = connConf.getList(Key.TABLE, String.class); Validate.isTrue(null != tables && !tables.isEmpty(), "您讀取數據庫表配置錯誤."); String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null); /** * 1-判斷是否配置了splitPk,如果沒有配置則每個table都會當成一個任務,生成一個配置文件給任務運行使用 * 2-如果配置了splitPk,如果只配了一個table,則重新計算eachTableShouldSplittedNumber=eachTableShouldSplittedNumber * 5; * 3-如果配置了多個table,eachTableShouldSplittedNumber不變,然后循環對每個表進行切分splitSingleTable */ //最終切分份數不一定等於 eachTableShouldSplittedNumber boolean needSplitTable = eachTableShouldSplittedNumber > 1 && StringUtils.isNotBlank(splitPk); if (needSplitTable) { if (tables.size() == 1) { //原來:如果是單表的,主鍵切分num=num*2+1 // splitPk is null這類的情況的數據量本身就比真實數據量少很多, 和channel大小比率關系時,不建議考慮 //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不應該加1導致長尾 //考慮其他比率數字?(splitPk is null, 忽略此長尾) eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; } // 嘗試對每個表,切分為eachTableShouldSplittedNumber 份 for (String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); List<Configuration> splittedSlices = SingleTableSplitUtil .splitSingleTable(tempSlice, eachTableShouldSplittedNumber); splittedConfigs.addAll(splittedSlices); } } else { for (String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column); tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); splittedConfigs.add(tempSlice); } } } else { // 說明是配置的 querySql 方式 List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class); // TODO 是否check 配置為多條語句?? for (String querySql : sqls) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.QUERY_SQL, querySql); splittedConfigs.add(tempSlice); } }
此處主要分為兩種,一種是table模式配置,一種是querysql模式配置, querySql模式相對簡單,配置了幾個sql,就會生成幾個任務的配置文件。
我們主要關注table模式,主要的流程:
/** * 1-判斷是否配置了splitPk,如果沒有配置則每個table都會當成一個任務,生成一個配置文件給任務運行使用 * 2-如果配置了splitPk,如果只配了一個table,則重新計算eachTableShouldSplittedNumber=eachTableShouldSplittedNumber * 5; * 3-如果配置了多個table,eachTableShouldSplittedNumber不變,然后循環對每個表進行切分splitSingleTable */
接下來關注splitSingleTable方法
大體流程是:
首先會根據 Configuration configuration, int adviceNum 配置文件信息和需要切分的個數進行切分會計算出splitPk的最大最小值,然后按照adviceNum進行分割,然后生成具體的sql
如果配置了15個channel,單表, 拆分,則此時通過上面的流程最后可以計算出task數目為75,在我的mysql表中有3089條數據,此時他會返回下面的配置總共75個有效配置,主要是看querySql,每個配置的條件都會不同。這只是其中一個
{ "column": "id,username,telephone", "columnList": ["id", "username", "telephone"], "fetchSize": -2147483648, "isTableMode": true, "jdbcUrl": "jdbc:mysql://localhost:3306/datax?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true", "loadBalanceResourceMark": "localhost", "password": "root123", "pkType": "pkTypeLong", "querySql": "select id,username,telephone from user where (547 <= id AND id < 588) ", "splitPk": "id", "table": "user", "tableNumber": 1, "username": "root" } 第一個sql: "querySql": "select id,username,telephone from user where (1 <= id AND id < 43) " 最后一個sql是: "querySql": "select id,username,telephone from user where id IS NULL" 最后一個有效sql是: "querySql": "select id,username,telephone from user where (3048 <= id AND id <= 3089) " 可以看到和表中的數據一致,這里主要是把數據量過大的表,按照配置計算出的channel進行切分, 然后生成分段的sql,一個sql對應一個任務,進行執行。
后面會根據taskGroup的個數,來對着75個任務進行分組,然后提交到線程池中並發執行任務.
如果沒有SPLIT_PK(目前splitPk僅支持整形、字符串數據切分),則為每個表生成一個任務
在DataX中,mysqlreader配置有兩種模式,一種是table模式,另外一種是querySql模式,兩種模式使用起來略有差別。
table模式
在table模式下, channel個數決定了reader和writer的個數上限,假設為m個:如果指定了splitPk字段,DataX會將mysql表中數據按照splitPk切分成n段,n大致為5倍的channel個數。
splitPk的字段限制了必需是整型或者字符串類型。由於DataX的實現方式是按照spliPk字段分段查詢數據庫表,那么spliPk字段的選取應該盡可能的選擇分布均勻且有索引的字段,比如主鍵id、唯一鍵等字段。DataX會啟動m個reader線程,消費DataX切分好的n個查詢sql語句(task), 對應的會有m個writer線程將查詢出來的數據寫入目標數據源中,並行度為m(也就是配置的channel個數),如果不指定splitPk字段,DataX將不會進行數據的切分,並行度直接退化成1。
需要指出的是,table模式下,如果用戶指定了spliPk將數據切分成了n段,由於這些task不是在同一個事務下進行select,那么最終取出的全量數據很有可能是不一致的。為了拿到一致性數據,要么不要配置spliPk使用單線程,要么確保mysql中要導出的數據不會再發生變化。
querySql模式
querySql模式一般用於有條件的數據導出,
"connection": [ { "querySql": [ #指定執行的SQL語句 "select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' " ], "jdbcUrl": ["jdbc:mysql://10.10.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc連接串 ] } ]
在此模式下,DataX不會再按照指定的column、table參數進行sql的拼接,而是會直接略過這些配置(如果有),直接執行querySql語句,task數量總是1,因此在此模式下channel的配置不再有多線程的效果。
性能調優
有人肯定會有疑問,有什么辦法可以盡可能加速數據的導出呢?
一般來說,大家首先想到的是提高並發度。在DataX中channel的個數決定了並發數量,但是要使channel參數生效,並不是簡單配一下channel就完事了。在MySQL導入Tablestore表的場景下,channel生效僅在能夠split出多個SQL語句的場景下,也就是table模式+spliPk下有用。
DataX的數據同步涉及三部分:
1.數據讀取 2.數據交換 3.數據寫入
對於以上三個環節,都有不同的優化方式,分析如下。
1.數據讀取
對於數據源讀取,導出的兩種模式:table模式和sqlQuery模式前面做了闡述,這里不再重復。
2. 數據交換
對於數據交換,前面提到,發送給MySQL數據庫SQL語句后會得到查詢的數據集,緩存在DataX的buffer中;除此之外,每個channel也維護了自己的record隊列,如果存在並發,channel的個數越多,也會需要更多的內存。因此首先需要考慮的是jvm的內存大小參數, 這個在啟動jvm進程的時候配置。
除此之外,有幾個控制channel的關鍵參數
"transport": { "channel": { "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "speed": { "byte": -1, "record": -1 }, "flowControlInterval": 20, "capacity": 512, "byteCapacity": 67108864 }, "exchanger": { "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger", "bufferSize": 32 } },
以上配置位於conf/core.json中:capacity限制了channel中隊列的大小(也就是最多緩存record的個數)byteCapacity限制了record占用的內存大小,core.json中的默認配置是64MB,若不指定將會被配置為8MB
這兩個參數決定了每個channel能buffer的記錄數量和內存占用情況,如果有需要調整,用戶應該按照DataX實際的運行環境予以配置。例如MySQL中每個record都比較大,那么可以考慮適當調高byteCapacity,當然調整這個參數還要考慮機器的內存情況。
bufferSize指定了BufferedRecordExchanger的緩存,reader讀了多少個往channel放。
一般情況下,channel隊列本身配置的調整並不會很常見,但是對於另外幾個流控參數,在使用DataX的時候應該注意。有兩個常用的流控參數:
a. byte 限制通道的默認傳輸速率, -1表示不限制
b. record 限制通道的傳輸記錄數,-1表示不限制
這兩個參數都是在flowControlInterval間隔里采樣后根據采樣值來決定是否流控的。
{ "core": { #定義了全局的系統參數,不指定會使用默認值 "transport": { "channel": { "speed": { "record": 5000, "byte": 102400 } } } }, "job": { "setting": { "speed": { #定義了單個channel的控制參數 "record": 10000, }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { .....#省略 }, "writer": { .....#省略 } } ] } }
3.數據寫入
適當的提高批量寫入的批次大小(batchWriteCount),也可以有效地提高吞吐率。相關關鍵配置如下:
{ "job": { "setting": { ....#省略 }, "content": [ { "reader": { .....#省略 }, "writer": { "name": "otswriter", "parameter": { ....... "writeMode":"UpdateRow", "batchWriteCount":100 } } } ] } }