flink sql 寫 kudu


Kudu 是現在比較火的一款存儲引擎,集HDFS的順序讀和HBase的隨機讀於一身,非常適合物流網場景,剛剛到達的數據就馬上要被終端用戶使用訪問到,未來還要做大規模的數據分析。

kudu 適合的場景(以下內容來自網絡):

1. 適用於那些既有隨機訪問,也有批量數據掃描的復合場景
2. CPU密集型的場景
3. 使用了高性能的存儲設備,包括使用更多的內存
4. 要求支持數據更新,避免數據反復遷移的場景
5. 支持跨地域的實時數據備份和查詢

最近感覺在批量讀 hbase 上遇到了瓶頸,急需尋找新的解決方案,這是時候看到了 kudu,看了介紹,感覺非常適合我們的場景:物流網場景,設備上傳的數據,需要實時查詢,又需要對設備時間范圍內的數據做批量分析。

在把數據寫到 kudu,目前 flink 還沒有官方的 connector,只能使用第三方 bahir 提供的包,比較遺憾的是 bahir-flink 中 kudu 的 connector 還沒有發布,目前只能自己在 github 下載 bahir-flink 的源代碼自己編譯(好消息是編譯很簡單)。

bahir-flink : https://github.com/apache/bahir-flink/tree/master/flink-connector-kudu

sqlSubmit 添加 flink-connector-kudu 依賴:

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-kudu_2.11</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>

sql 如下:

-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior INT
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior'
  ,'properties.bootstrap.servers' = 'venn:9092'
  ,'properties.group.id' = 'user_log_x'
  ,'scan.startup.mode' = 'group-offsets'
  ,'format' = 'json'
);

-- kafka sink
drop table if exists user_log_sink;
CREATE TABLE user_log_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,ts  TIMESTAMP(3)
) WITH (
  'connector.type' = 'kudu'
  ,'kudu.masters' = 'venn:7051,venn:7151,venn:7251'
  ,'kudu.table' = 'user_log'
  ,'kudu.hash-columns' = 'user_id'
  ,'kudu.primary-key-columns' = 'user_id'
  ,'kudu.max-buffer-size' = '5000'
  ,'kudu.flush-interval' = '1000'
);

-- insert
insert into user_log_sink
select user_id, item_id, category_id,ts
from user_log;

查看數據:

java api 讀取數據

private void queryData() throws KuduException {
    KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

    KuduTable table = client.openTable(TABLE_NAME);

    Schema schema = table.getSchema();
    KuduScanner scanner = null;
    try {
        List<String> projectColumn = new ArrayList<>();
        projectColumn.add("user_id");
        projectColumn.add("item_id");
        projectColumn.add("category_id");
        projectColumn.add("ts");

        KuduPredicate lowPredicate = KuduPredicate.newComparisonPredicate(schema.getColumn("user_id"), KuduPredicate.ComparisonOp.GREATER_EQUAL, " ");
        KuduPredicate upPredicate = KuduPredicate.newComparisonPredicate(schema.getColumn("user_id"), KuduPredicate.ComparisonOp.LESS_EQUAL, "~");

        scanner = client.newScannerBuilder(table)
                .setProjectedColumnNames(projectColumn)
                .addPredicate(lowPredicate)
                .addPredicate(upPredicate)
                .build();

        long start = System.currentTimeMillis();
        int count = 0;
        while (scanner.hasMoreRows()) {
            RowResultIterator results = scanner.nextRows();
            while (results.hasNext()) {
                RowResult result = results.next();
                StringBuilder builder = new StringBuilder();
                List<ColumnSchema> list = result.getSchema().getColumns();
                for (ColumnSchema schema1 : list) {
                    String columnName = schema1.getName();
                    Type columnType = schema1.getType();

                    switch (columnType) {
                        case STRING: {
                            String tmp = result.getString(columnName);
                            if (!result.isNull(columnName)) {
                                builder.append(columnName + " : " + tmp).append(", ");
                            }
                            break;
                        }
                        case UNIXTIME_MICROS: {
                            if (!result.isNull(columnName)) {
                                Timestamp ts = result.getTimestamp(columnName);
                                builder.append(columnName + " : " + DateTimeUtil.formatMillis(ts.getTime(), DateTimeUtil.YYYY_MM_DD_HH_MM_SS));
                            }
                            break;
                        }
                        case INT8: {
                            if (!result.isNull(columnName)) {
                                byte tmp = result.getByte(columnName);
                                builder.append(columnName + " : " + tmp);
                            }
                            break;
                        }
                        default: {
                            builder.append(columnName + " : ");
                        }
                    }
                }

                System.out.println(builder.toString());
                ++count;
            }
        }
        System.out.println("result count : " + count);

        long end = System.currentTimeMillis();
        System.out.println("cost : " + (end - start));
    } finally {
        if (scanner != null) {
            scanner.close();
        }
        client.shutdown();
    }

}

輸出如下:

user_id : user_id_9982, item_id : item_id_1, category_id : category_id_1, ts : 2021-04-16 16:14:45
user_id : user_id_9986, item_id : item_id_9, category_id : category_id_9, ts : 2021-04-16 16:14:45
user_id : user_id_9989, item_id : item_id_2, category_id : category_id_2, ts : 2021-04-16 16:14:45
user_id : user_id_9991, item_id : item_id_8, category_id : category_id_8, ts : 2021-04-16 16:14:45
user_id : user_id_9992, item_id : item_id_2, category_id : category_id_2, ts : 2021-04-16 16:14:45
user_id : user_id_9994, item_id : item_id_3, category_id : category_id_3, ts : 2021-04-16 16:14:45
user_id : user_id_9995, item_id : item_id_7, category_id : category_id_7, ts : 2021-04-16 16:14:45
user_id : user_id_9999, item_id : item_id_7, category_id : category_id_7, ts : 2021-04-16 16:14:45
result count : 65867
cost : 863

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM