kudu系列: Java API使用和效率測試


Kudu+Impala很適合數據分析, 但直接使用Insert values語句往Kudu表插入數據, 效率實在不好, 測試下來insert的速度僅為80筆/秒. 原因也是顯然的, Kudu本身寫入效率很高, 但是Impala並沒有做這方面優化, 觀察下來每次Impala語句執行的overhead都太大了, 導致頻繁小批次寫入效率非常差, Kudu官方推薦使用Java API或Python API完成數據寫入工作. 下面是使用Java API的測試用例, 也可以看出Kudu API的大致用法. 

 

=========================
准備測試Table
=========================

-- kudu table
CREATE TABLE kudu_testdb.tmp_test_perf
(
id string ENCODING PLAIN_ENCODING COMPRESSION SNAPPY,
int_value int ,
bigint_value bigint ,
timestamp_value timestamp ,
boolean_value int,
PRIMARY KEY (id)
)
PARTITION BY HASH (id) PARTITIONS 6
STORED AS KUDU
TBLPROPERTIES (
'kudu.table_name' = 'testdb.tmp_test_perf',
'kudu.master_addresses' = '10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051',
'kudu.num_tablet_replicas' = '1'
)
;

 

=========================
編寫測試java程序
=========================
Kudu API 編碼注意事項:

1. 盡管建表Impala DDL中,kudu表字段名大小寫不敏感, 但在kudu層面, 字段名稱已經轉成為小寫形式, 在Kudu API中, 字段名稱必須是小寫字母.
2. 建表Impala DDL表名稱大小寫會被完整地保留下來, 並沒有被轉成小寫, 而且在Kudu API使用中, 表名是大小寫敏感的, 必須和建表DDL完全一致.
3. Kudu API給字段賦值函數是不接受傳入null, 所以如果在為字段賦值之前, 最好先判斷一下取值是否為null. 例如下面兩行代碼會報錯. 

Long longTmp=null;
row.addLong("bigint_value",longTmp);

 
package kudu_perf_test;

import java.sql.Timestamp;
import java.util.UUID;
import org.apache.kudu.client.*;

public class Test {
    private final static int OPERATION_BATCH = 500;
    
    //同時支持三個模式的測試用例  
    public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode,
            int recordCount) throws Exception {
        // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
        // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
        // SessionConfiguration.FlushMode.MANUAL_FLUSH
        session.setFlushMode(mode);
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
            session.setMutationBufferSpace(OPERATION_BATCH);
        }
        int uncommit = 0;

        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("int_value", 100);
            row.addLong("bigint_value", 10000L);
            
            
            Long gtmMillis;
            /* System.currentTimeMillis() 是從1970-01-01開始算的毫秒數(GMT), kudu API是采用納秒數, 所以需要*1000
             另外, 考慮到我們是東8區時間, 所以轉成Long型需要再加8個小時, 否則存到Kudu的時間是GTM, 比東8區晚8個小時
             */
            
            //方法1: 獲取當前時間對應的GTM時區unix毫秒數
            gtmMillis=System.currentTimeMillis(); 
            
            
            //方法2: 將timestamp轉成對應的GTM時區unix毫秒數
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis=localTimestamp.getTime();   
            
            //將GTM的毫秒數轉成東8區的毫秒數量
            Long shanghaiTimezoneMillis=gtmMillis+8*3600*1000;
            row.addLong("timestamp_value", shanghaiTimezoneMillis*1000);
            
            session.apply(insert);

            // 對於手工提交, 需要buffer在未滿的時候flush,這里采用了buffer一半時即提交
            if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
                uncommit = uncommit + 1;
                if (uncommit > OPERATION_BATCH / 2) {
                    session.flush();
                    uncommit = 0;
                }
            }
        }

        // 對於手工提交, 保證完成最后的提交
        if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && uncommit > 0) {
            session.flush();
        }

        // 對於后台自動提交, 必須保證完成最后的提交, 並保證有錯誤時能拋出異常
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
            session.flush();  
            RowErrorsAndOverflowStatus error = session.getPendingErrors();
            if (error.isOverflowed() || error.getRowErrors().length > 0) {
                if (error.isOverflowed()) {
                    throw new Exception("Kudu overflow exception occurred.");
                }
                StringBuilder errorMessage = new StringBuilder();
                if (error.getRowErrors().length > 0) {
                    for (RowError errorObj : error.getRowErrors()) {
                        errorMessage.append(errorObj.toString());
                        errorMessage.append(";");
                    }
                }
                throw new Exception(errorMessage.toString());
            }
        }

    }

    //僅支持手動flush的測試用例  
    public static void insertTestManual(KuduSession session, KuduTable table, int recordCount) throws Exception {
        // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
        // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
        // SessionConfiguration.FlushMode.MANUAL_FLUSH
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
        session.setFlushMode(mode);
        session.setMutationBufferSpace(OPERATION_BATCH);

        int uncommit = 0;
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("int_value", 100);
            row.addLong("bigint_value", 10000L);
            
            
            Long gtmMillis;
            /* System.currentTimeMillis() 是從1970-01-01開始算的毫秒數(GMT), kudu API是采用納秒數, 所以需要*1000
             另外, 考慮到我們是東8區時間, 所以轉成Long型需要再加8個小時, 否則存到Kudu的時間是GTM, 比東8區晚8個小時
             */
            
            //方法1: 獲取當前時間對應的GTM時區unix毫秒數
            gtmMillis=System.currentTimeMillis(); 
            
            
            //方法2: 將timestamp轉成對應的GTM時區unix毫秒數
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis=localTimestamp.getTime();   
            
            //將GTM的毫秒數轉成東8區的毫秒數量
            Long shanghaiTimezoneMillis=gtmMillis+8*3600*1000;
            row.addLong("timestamp_value", shanghaiTimezoneMillis*1000);

            session.apply(insert);
            
            // 對於手工提交, 需要buffer在未滿的時候flush,這里采用了buffer一半時即提交
            uncommit = uncommit + 1;
            if (uncommit > OPERATION_BATCH / 2) {
                session.flush();
                uncommit = 0;
            }
        }

        // 對於手工提交, 保證完成最后的提交
        if (uncommit > 0) {
            session.flush();
        }
    }
   
    //僅支持自動flush的測試用例
    public static void insertTestInAutoSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
        // SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
        // SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
        // SessionConfiguration.FlushMode.MANUAL_FLUSH
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
        session.setFlushMode(mode);        

        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("int_value", 100);
            row.addLong("bigint_value", 10000L);
            
            
            Long gtmMillis;
            /* System.currentTimeMillis() 是從1970-01-01開始算的毫秒數(GMT), kudu API是采用納秒數, 所以需要*1000
             另外, 考慮到我們是東8區時間, 所以轉成Long型需要再加8個小時, 否則存到Kudu的時間是GTM, 比東8區晚8個小時
             */
            
            //方法1: 獲取當前時間對應的GTM時區unix毫秒數
            gtmMillis=System.currentTimeMillis(); 
            
            
            //方法2: 將timestamp轉成對應的GTM時區unix毫秒數
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis=localTimestamp.getTime();   
            
            //將GTM的毫秒數轉成東8區的毫秒數量
            Long shanghaiTimezoneMillis=gtmMillis+8*3600*1000;
            row.addLong("timestamp_value", shanghaiTimezoneMillis*1000);
            
            //對於AUTO_FLUSH_SYNC模式, apply()將立即完成kudu寫入
            session.apply(insert);
        }
    }

    public static void test() throws KuduException {
        KuduClient client = new KuduClient.KuduClientBuilder("10.0.0.100:7051,10.0.0.101:7051,10.0.0.101:7051")
                .build();
        KuduSession session = client.newSession();
        KuduTable table = client.openTable("testdb.tmp_test_perf");

        SessionConfiguration.FlushMode mode;
        Timestamp d1 = null;
        Timestamp d2 = null;
        long millis;
        long seconds;
        int recordCount = 0;

        try {
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
            d1 = new Timestamp(System.currentTimeMillis());
            insertTestGeneric(session, table, mode, recordCount);
            d2 = new Timestamp(System.currentTimeMillis());
            millis = d2.getTime() - d1.getTime();
            seconds = millis / 1000 % 60;
            System.out.println(mode.name() + "耗時秒數:" + seconds);

            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;            
            d1 = new Timestamp(System.currentTimeMillis());
            insertTestInAutoSync(session, table,  recordCount);
            d2 = new Timestamp(System.currentTimeMillis());
            millis = d2.getTime() - d1.getTime();
            seconds = millis / 1000 % 60;
            System.out.println(mode.name() + "耗時秒數:" + seconds);
            
            mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;                    
            d1 = new Timestamp(System.currentTimeMillis());
            insertTestManual(session, table,  recordCount);
            d2 = new Timestamp(System.currentTimeMillis());
            millis = d2.getTime() - d1.getTime();
            seconds = millis / 1000 % 60;
            System.out.println(mode.name() + "耗時秒數:" + seconds);            
            

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if (!session.isClosed()) {
                session.close();
            }
        }

    }

    public static void main(String[] args) {
        try {
            test();
        } catch (KuduException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Done");

    }
}

 

=========================
性能測試結果
=========================
MANUAL_FLUSH 模式:8000 row/second
AUTO_FLUSH_BACKGROUND 模式:8000 row/second
AUTO_FLUSH_SYNC 模式:1000 row/second
Impala SQL Insert 語句:80 row/second

 

=========================
Kudu API 使用總結
=========================
1. 盡量采用 MANUAL_FLUSH, 性能最好, 如果有寫入kudu錯誤, flush()函數就會拋出異常, 邏輯非常清晰.
2. 在性能要求不高的情況下, AUTO_FLUSH_SYNC 也是一個好的選擇.
3. 僅僅在demo場景下使用 AUTO_FLUSH_BACKGROUND, 不考慮異常處理時候代碼可以很簡單, 性能也很好. 在生產環境下, 不推薦的 原因是: 插入數據可能會是亂序, 一旦考慮捕獲異常代碼就很拖沓.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM