解讀Datax mysql reader配置


datax里所有的關系型數據庫都走通用的處理Reader,com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader,當進行split的時候會進行切分獲取channel的個數,

public List<Configuration> split(Configuration originalConfig,
                                         int adviceNumber) {
            return ReaderSplitUtil.doSplit(originalConfig, adviceNumber);
}

繼續進入doSplit方法com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil 此處會去判斷是否是table配置的模式

if (isTableMode) {
            // adviceNumber這里是channel數量大小, 即datax並發task數量
            // eachTableShouldSplittedNumber是單表應該切分的份數, 向上取整可能和adviceNumber沒有比例關系了已經
            eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
                    adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
}

如果前面計算的adviceNumber=3 配置了一個table 則每個表分到的channel是3/1=3,

 
// 說明是配置的 table 方式
if (isTableMode) {
    // 已在之前進行了擴展和`處理,可以直接使用
    List<String> tables = connConf.getList(Key.TABLE, String.class);
 
    Validate.isTrue(null != tables && !tables.isEmpty(), "您讀取數據庫表配置錯誤.");
 
    String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);
 
     /**
      * 1-判斷是否配置了splitPk,如果沒有配置則每個table都會當成一個任務,生成一個配置文件給任務運行使用
      * 2-如果配置了splitPk,如果只配了一個table,則重新計算eachTableShouldSplittedNumber=eachTableShouldSplittedNumber * 5;
      * 3-如果配置了多個table,eachTableShouldSplittedNumber不變,然后循環對每個表進行切分splitSingleTable
     */
    //最終切分份數不一定等於 eachTableShouldSplittedNumber
    boolean needSplitTable = eachTableShouldSplittedNumber > 1
            && StringUtils.isNotBlank(splitPk);
    if (needSplitTable) {
        if (tables.size() == 1) {
            //原來:如果是單表的,主鍵切分num=num*2+1
            // splitPk is null這類的情況的數據量本身就比真實數據量少很多, 和channel大小比率關系時,不建議考慮
            //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不應該加1導致長尾
             
            //考慮其他比率數字?(splitPk is null, 忽略此長尾)
            eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
        }
        // 嘗試對每個表,切分為eachTableShouldSplittedNumber 份
        for (String table : tables) {
            tempSlice = sliceConfig.clone();
            tempSlice.set(Key.TABLE, table);
 
            List<Configuration> splittedSlices = SingleTableSplitUtil
                    .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);
 
            splittedConfigs.addAll(splittedSlices);
        }
    } else {
        for (String table : tables) {
            tempSlice = sliceConfig.clone();
            tempSlice.set(Key.TABLE, table);
            String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
            tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where));
            splittedConfigs.add(tempSlice);
        }
    }
} else {
    // 說明是配置的 querySql 方式
    List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);
 
    // TODO 是否check 配置為多條語句??
    for (String querySql : sqls) {
        tempSlice = sliceConfig.clone();
        tempSlice.set(Key.QUERY_SQL, querySql);
        splittedConfigs.add(tempSlice);
    }
}

此處主要分為兩種,一種是table模式配置,一種是querysql模式配置, querySql模式相對簡單,配置了幾個sql,就會生成幾個任務的配置文件。

我們主要關注table模式,主要的流程:

 /**
 * 1-判斷是否配置了splitPk,如果沒有配置則每個table都會當成一個任務,生成一個配置文件給任務運行使用
 * 2-如果配置了splitPk,如果只配了一個table,則重新計算eachTableShouldSplittedNumber=eachTableShouldSplittedNumber * 5;
 * 3-如果配置了多個table,eachTableShouldSplittedNumber不變,然后循環對每個表進行切分splitSingleTable
*/

接下來關注splitSingleTable方法

大體流程是:
首先會根據 Configuration configuration, int adviceNum 配置文件信息和需要切分的個數進行切分會計算出splitPk的最大最小值,然后按照adviceNum進行分割,然后生成具體的sql

如果配置了15個channel,單表, 拆分,則此時通過上面的流程最后可以計算出task數目為75,在我的mysql表中有3089條數據,此時他會返回下面的配置總共75個有效配置,主要是看querySql,每個配置的條件都會不同。這只是其中一個

{
"column": "id,username,telephone",
"columnList": ["id", "username", "telephone"],
"fetchSize": -2147483648,
"isTableMode": true,
"jdbcUrl": "jdbc:mysql://localhost:3306/datax?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true",
"loadBalanceResourceMark": "localhost",
"password": "root123",
"pkType": "pkTypeLong",
"querySql": "select id,username,telephone from user  where  (547 <= id AND id < 588) ",
"splitPk": "id",
"table": "user",
"tableNumber": 1,
"username": "root"
}
第一個sql:
"querySql": "select id,username,telephone from user  where  (1 <= id AND id < 43) "
最后一個sql是:
"querySql": "select id,username,telephone from user  where  id IS NULL"
最后一個有效sql是:
"querySql": "select id,username,telephone from user  where  (3048 <= id AND id <= 3089) "
 
可以看到和表中的數據一致,這里主要是把數據量過大的表,按照配置計算出的channel進行切分,
然后生成分段的sql,一個sql對應一個任務,進行執行。

后面會根據taskGroup的個數,來對着75個任務進行分組,然后提交到線程池中並發執行任務.

如果沒有SPLIT_PK(目前splitPk僅支持整形、字符串數據切分),則為每個表生成一個任務

在DataX中,mysqlreader配置有兩種模式,一種是table模式,另外一種是querySql模式,兩種模式使用起來略有差別。

table模式

在table模式下, channel個數決定了reader和writer的個數上限,假設為m個:如果指定了splitPk字段,DataX會將mysql表中數據按照splitPk切分成n段,n大致為5倍的channel個數。

splitPk的字段限制了必需是整型或者字符串類型。由於DataX的實現方式是按照spliPk字段分段查詢數據庫表,那么spliPk字段的選取應該盡可能的選擇分布均勻且有索引的字段,比如主鍵id、唯一鍵等字段。DataX會啟動m個reader線程,消費DataX切分好的n個查詢sql語句(task), 對應的會有m個writer線程將查詢出來的數據寫入目標數據源中,並行度為m(也就是配置的channel個數),如果不指定splitPk字段,DataX將不會進行數據的切分,並行度直接退化成1。

需要指出的是,table模式下,如果用戶指定了spliPk將數據切分成了n段,由於這些task不是在同一個事務下進行select,那么最終取出的全量數據很有可能是不一致的。為了拿到一致性數據,要么不要配置spliPk使用單線程,要么確保mysql中要導出的數據不會再發生變化。

querySql模式

querySql模式一般用於有條件的數據導出,

 "connection": [
                            {
                                "querySql": [ #指定執行的SQL語句
                                    "select bucket_name, delta , timestamp ,cdn_in, cdn_out ,total_request from vip_quota where bucket_name='xxx' "
                                ],
                                "jdbcUrl": ["jdbc:mysql://10.10.0.8:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true" #jdbc連接串
                                ]
                            }
                        ]

在此模式下,DataX不會再按照指定的column、table參數進行sql的拼接,而是會直接略過這些配置(如果有),直接執行querySql語句,task數量總是1,因此在此模式下channel的配置不再有多線程的效果。

性能調優

有人肯定會有疑問,有什么辦法可以盡可能加速數據的導出呢?

一般來說,大家首先想到的是提高並發度。在DataX中channel的個數決定了並發數量,但是要使channel參數生效,並不是簡單配一下channel就完事了。在MySQL導入Tablestore表的場景下,channel生效僅在能夠split出多個SQL語句的場景下,也就是table模式+spliPk下有用。

DataX的數據同步涉及三部分:

1.數據讀取
2.數據交換
3.數據寫入

對於以上三個環節,都有不同的優化方式,分析如下。

1.數據讀取
對於數據源讀取,導出的兩種模式:table模式和sqlQuery模式前面做了闡述,這里不再重復。

2. 數據交換
對於數據交換,前面提到,發送給MySQL數據庫SQL語句后會得到查詢的數據集,緩存在DataX的buffer中;除此之外,每個channel也維護了自己的record隊列,如果存在並發,channel的個數越多,也會需要更多的內存。因此首先需要考慮的是jvm的內存大小參數, 這個在啟動jvm進程的時候配置。

除此之外,有幾個控制channel的關鍵參數

 
 "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": -1,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },

以上配置位於conf/core.json中:capacity限制了channel中隊列的大小(也就是最多緩存record的個數)byteCapacity限制了record占用的內存大小,core.json中的默認配置是64MB,若不指定將會被配置為8MB

這兩個參數決定了每個channel能buffer的記錄數量和內存占用情況,如果有需要調整,用戶應該按照DataX實際的運行環境予以配置。例如MySQL中每個record都比較大,那么可以考慮適當調高byteCapacity,當然調整這個參數還要考慮機器的內存情況。

bufferSize指定了BufferedRecordExchanger的緩存,reader讀了多少個往channel放。

一般情況下,channel隊列本身配置的調整並不會很常見,但是對於另外幾個流控參數,在使用DataX的時候應該注意。有兩個常用的流控參數:

a. byte 限制通道的默認傳輸速率, -1表示不限制
b. record 限制通道的傳輸記錄數,-1表示不限制

這兩個參數都是在flowControlInterval間隔里采樣后根據采樣值來決定是否流控的。

 
{
    "core": {  #定義了全局的系統參數,不指定會使用默認值
        "transport": {
            "channel": {
                "speed": {     
                    "record": 5000,
                    "byte": 102400
                }
            }
        }
    },


    "job": {
        "setting": {
            "speed": {  #定義了單個channel的控制參數
                "record": 10000,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                      .....#省略
                },

                "writer": {
                    .....#省略
                }

            }
        ]
    }
}

3.數據寫入

適當的提高批量寫入的批次大小(batchWriteCount),也可以有效地提高吞吐率。相關關鍵配置如下:

{
    "job": {
        "setting": {
           ....#省略
        },
        "content": [
            {
                "reader": {
                      .....#省略
                },

                "writer": {
                    "name": "otswriter",
                    "parameter": {
                            .......
                        "writeMode":"UpdateRow",
                        "batchWriteCount":100
                    }
                }

            }
        ]
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM