Kudu 是現在比較火的一款存儲引擎,集HDFS的順序讀和HBase的隨機讀於一身,非常適合物流網場景,剛剛到達的數據就馬上要被終端用戶使用訪問到,未來還要做大規模的數據分析。
kudu 適合的場景(以下內容來自網絡):
1. 適用於那些既有隨機訪問,也有批量數據掃描的復合場景 2. CPU密集型的場景 3. 使用了高性能的存儲設備,包括使用更多的內存 4. 要求支持數據更新,避免數據反復遷移的場景 5. 支持跨地域的實時數據備份和查詢
最近感覺在批量讀 hbase 上遇到了瓶頸,急需尋找新的解決方案,這是時候看到了 kudu,看了介紹,感覺非常適合我們的場景:物流網場景,設備上傳的數據,需要實時查詢,又需要對設備時間范圍內的數據做批量分析。
在把數據寫到 kudu,目前 flink 還沒有官方的 connector,只能使用第三方 bahir 提供的包,比較遺憾的是 bahir-flink 中 kudu 的 connector 還沒有發布,目前只能自己在 github 下載 bahir-flink 的源代碼自己編譯(好消息是編譯很簡單)。
bahir-flink : https://github.com/apache/bahir-flink/tree/master/flink-connector-kudu
sqlSubmit 添加 flink-connector-kudu 依賴:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-kudu_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
sql 如下:
-- kafka source drop table if exists user_log; CREATE TABLE user_log ( user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior INT ,ts TIMESTAMP(3) ,process_time as proctime() , WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka' ,'topic' = 'user_behavior' ,'properties.bootstrap.servers' = 'venn:9092' ,'properties.group.id' = 'user_log_x' ,'scan.startup.mode' = 'group-offsets' ,'format' = 'json' ); -- kafka sink drop table if exists user_log_sink; CREATE TABLE user_log_sink ( user_id STRING ,item_id STRING ,category_id STRING ,ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'kudu' ,'kudu.masters' = 'venn:7051,venn:7151,venn:7251' ,'kudu.table' = 'user_log' ,'kudu.hash-columns' = 'user_id' ,'kudu.primary-key-columns' = 'user_id' ,'kudu.max-buffer-size' = '5000' ,'kudu.flush-interval' = '1000' ); -- insert insert into user_log_sink select user_id, item_id, category_id,ts from user_log;
查看數據:

java api 讀取數據
private void queryData() throws KuduException { KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); KuduTable table = client.openTable(TABLE_NAME); Schema schema = table.getSchema(); KuduScanner scanner = null; try { List<String> projectColumn = new ArrayList<>(); projectColumn.add("user_id"); projectColumn.add("item_id"); projectColumn.add("category_id"); projectColumn.add("ts"); KuduPredicate lowPredicate = KuduPredicate.newComparisonPredicate(schema.getColumn("user_id"), KuduPredicate.ComparisonOp.GREATER_EQUAL, " "); KuduPredicate upPredicate = KuduPredicate.newComparisonPredicate(schema.getColumn("user_id"), KuduPredicate.ComparisonOp.LESS_EQUAL, "~"); scanner = client.newScannerBuilder(table) .setProjectedColumnNames(projectColumn) .addPredicate(lowPredicate) .addPredicate(upPredicate) .build(); long start = System.currentTimeMillis(); int count = 0; while (scanner.hasMoreRows()) { RowResultIterator results = scanner.nextRows(); while (results.hasNext()) { RowResult result = results.next(); StringBuilder builder = new StringBuilder(); List<ColumnSchema> list = result.getSchema().getColumns(); for (ColumnSchema schema1 : list) { String columnName = schema1.getName(); Type columnType = schema1.getType(); switch (columnType) { case STRING: { String tmp = result.getString(columnName); if (!result.isNull(columnName)) { builder.append(columnName + " : " + tmp).append(", "); } break; } case UNIXTIME_MICROS: { if (!result.isNull(columnName)) { Timestamp ts = result.getTimestamp(columnName); builder.append(columnName + " : " + DateTimeUtil.formatMillis(ts.getTime(), DateTimeUtil.YYYY_MM_DD_HH_MM_SS)); } break; } case INT8: { if (!result.isNull(columnName)) { byte tmp = result.getByte(columnName); builder.append(columnName + " : " + tmp); } break; } default: { builder.append(columnName + " : "); } } } System.out.println(builder.toString()); ++count; } } System.out.println("result count : " + count); long end = System.currentTimeMillis(); System.out.println("cost : " + (end - start)); } finally { if (scanner != null) { scanner.close(); } client.shutdown(); } }
輸出如下:
user_id : user_id_9982, item_id : item_id_1, category_id : category_id_1, ts : 2021-04-16 16:14:45 user_id : user_id_9986, item_id : item_id_9, category_id : category_id_9, ts : 2021-04-16 16:14:45 user_id : user_id_9989, item_id : item_id_2, category_id : category_id_2, ts : 2021-04-16 16:14:45 user_id : user_id_9991, item_id : item_id_8, category_id : category_id_8, ts : 2021-04-16 16:14:45 user_id : user_id_9992, item_id : item_id_2, category_id : category_id_2, ts : 2021-04-16 16:14:45 user_id : user_id_9994, item_id : item_id_3, category_id : category_id_3, ts : 2021-04-16 16:14:45 user_id : user_id_9995, item_id : item_id_7, category_id : category_id_7, ts : 2021-04-16 16:14:45 user_id : user_id_9999, item_id : item_id_7, category_id : category_id_7, ts : 2021-04-16 16:14:45 result count : 65867 cost : 863
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

