有人會問,為啥要用這個叫啥Kudu的,Kudu是啥?
就像官網所說,Kudu是一個針對Apache hadoop 平台而開發的列式存儲管理器,在本菜鳥看來,它是一種介於hdfs與hbase的一種存儲。它的優勢在於:
1、OLAP工作的快速處理,也就是針對於查詢,很快,很牛逼。
2、針對同時運行順序和隨機工作負載的情況性能很好。
3、高可用,Table server和master使用Raft Consensus Algorithm節點來保證高可用,什么是Raft Consunsus Algorith?參考:https://www.cnblogs.com/mindwind/p/5231986.html),只要有一半以上的副本可用,該tablet便可用於讀寫。
4、結構化數據模型(可以理解為帶schema)。
該圖顯示了一個具有三個 master 和多個 tablet server 的 Kudu 集群,每個服務器都支持多個 tablet。它說明了如何使用 Raft 共識來允許 master 和 tablet server 的 leader 和 f ollow。此外,tablet server 可以成為某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色顯示,而 follower 則顯示為藍色。
下面是一些基本概念:
Table(表)
一張 talbe 是數據存儲在 Kudu 的位置。表具有 schema 和全局有序的 primary key(主鍵)。table 被分成稱為 tablets 的 segments。
Tablet
一個 tablet 是一張 table 連續的 segment,與其它數據存儲引擎或關系型數據庫中的 partition(分區)相似。給定的 tablet 冗余到多個 tablet 服務器上,並且在任何給定的時間點,其中一個副本被認為是 leader tablet。任何副本都可以對讀取進行服務,並且寫入時需要在為 tablet 服務的一組 tablet server之間達成一致性。
Tablet Server
一個 tablet server 存儲 tablet 和為 tablet 向 client 提供服務。對於給定的 tablet,一個 tablet server 充當 leader,其他 tablet server 充當該 tablet 的 follower 副本。只有 leader服務寫請求,然而 leader 或 followers 為每個服務提供讀請求。leader 使用 Raft Consunsus Algorithm來進行選舉 。一個 tablet server 可以服務多個 tablets ,並且一個 tablet 可以被多個 tablet servers 服務着。
具體我還沒有那么深入,寫了些api調用玩了一把,下面慢慢講述,Kudu的API比較惡心的哈。。
kudu的sql語法與傳統的sql語法比較相似,但也不盡相同,直接解析時,具體sql語法請參考官網,下面以類似hive metastore表結構的方式封裝了下。以下列sql為例:
create table combined_t6 (x int64, s string, s2 string, primary key (x, s))
partition by hash (x) partitions 10, range (x)
(
partition 0 <= values <= 49, partition 50 <= values <= 100
) REPLICAS 1
public Boolean create(Table table,String operator) { LOGGER.info("kudu Table properties:" + table.getKvInfos().toString()); List<ColumnSchema> columns = new ArrayList(table.getTableColumnList().size());
KuduTableGenerateUtil.generateKuduColumn(table.getTableColumnList(),columns); Schema schema = new Schema(columns); KuduPartitionSchema kuduPartitionSchema = KuduTableGenerateUtil.parserPartition(table); CreateTableOptions tableOptions = KuduTableGenerateUtil.generateKuduTableOptions(table,schema,kuduPartitionSchema); try { getKuduClient(table).createTable(table.getTableName(), schema,tableOptions); } catch (KuduException e) { throw new MetadataInvalidObjectException(e, " create kudu storage table error!!"); } return true; }
kudu的column屬性中,包含有primarfyKey、encoding、compression algorithm、null table 、default value 、block size等屬性,所以從上述代碼中需要先將kuduColumn進行封裝,構造ColumnSchema對象:
new ColumnSchema.ColumnSchemaBuilder(tableColumn.getColumnName(), getKuduColumnType(tableColumn.getDataType())) .key(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_PRIMARY_KEY))) .nullable(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_IS_NULLTABLE))) .defaultValue(defaultValue) .desiredBlockSize(getDesiredBlockSize(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_DESIRED_BLOCKSIZE))) .encoding(getColumnEncoding(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_ENCODING))) .compressionAlgorithm(getCompressionType(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_COMPRESSION_ALGORITHM))) .build();
對於column的數據類型,有很多種,如下:
private static Type getKuduColumnType(String dataType) { switch (dataType.toUpperCase()) { case "INT8": return Type.INT8; case "INT16": return Type.INT16; case "INT32": return Type.INT32; case "INT64": return Type.INT64; case "BINARY": return Type.BINARY; case "STRING": return Type.STRING; case "BOOL": return Type.BOOL; case "FLOAT": return Type.FLOAT; case "DOUBLE": return Type.DOUBLE; case "UNIXTIME_MICROS": return Type.UNIXTIME_MICROS; default: return Type.STRING; } }
壓縮方式包括:
public static CompressionAlgorithm getCompressionType(String compressionType) { if (StringUtils.isNotBlank(compressionType)) { switch (compressionType.toUpperCase()) { case "UNKNOWN": return CompressionAlgorithm.UNKNOWN; case "DEFAULT_COMPRESSION": return CompressionAlgorithm.DEFAULT_COMPRESSION; case "NO_COMPRESSION": return CompressionAlgorithm.NO_COMPRESSION; case "SNAPPY": return CompressionAlgorithm.SNAPPY; case "LZ4": return CompressionAlgorithm.LZ4; case "ZLIB": return CompressionAlgorithm.UNKNOWN.ZLIB; default: return null; } } return null; }
隨之我們要構造,Kudu Partition,Kudu Partition包含兩種類型,一種是hashPartition,一種是rangePartition,其實從字面意思應該也能夠想到,一種是用於對某個字段進行hash散列,一種是進行分區區間的設置,從而在查詢時達到優化的效果,這里通過將sql解析后的轉換的KuduPartitionSchema對象分別進行range與hash partition的組裝,也就是將sql中 Partition表達式 partition 0 <= values <= 49, partition 50 <= values <= 100 封裝:
public static void generateHashPartition(CreateTableOptions tableOptions, List<HashPartitionSchema> hashPartitionSchemas) { if (null != hashPartitionSchemas && hashPartitionSchemas.size() != 0) {
hashPartitionSchemas.forEach(hashPartitionSchema ->{
tableOptions.addHashPartitions(hashPartitionSchema.getColumns(), hashPartitionSchema.getBucket());
});
}
}
public static void generateRangePartition(Schema schema, CreateTableOptions tableOptions, RangePartitionSchema rangePartitionSchema) { tableOptions.setRangePartitionColumns(rangePartitionSchema.getColumns()); List<RangeSplit> ranges = rangePartitionSchema.getRanges(); ranges.forEach(range -> { tableOptions.addRangePartition( getPartialRow( range.getLower(), schema, rangePartitionSchema.getColumns()), getPartialRow( range.getUpper(), schema, rangePartitionSchema.getColumns()), getRangePartitionBound( range.getLowerBoundType()), getRangePartitionBound( range.getUpperBoundType()) ); }); }
public static RangePartitionBound getRangePartitionBound(String boundType) { if (StringUtils.isNotBlank(boundType)) { switch (boundType) { case "EXCLUSIVE_BOUND": return RangePartitionBound.EXCLUSIVE_BOUND; case "INCLUSIVE_BOUND": return RangePartitionBound.INCLUSIVE_BOUND; default: return null; } } return null; }
最后構造,CreateTableOptions對象:
public static CreateTableOptions generateKuduTableOptions(Table table, Schema schema, KuduPartitionSchema kuduPartitionSchema) { CreateTableOptions tableOptions = new CreateTableOptions(); String numReplicas = table.getKvInfos().get(MetadataConfigKey.TABLE_KUDU_REPLICAS); if (StringUtils.isNotBlank(numReplicas)) { tableOptions.setNumReplicas(Integer.valueOf(numReplicas)); } if (kuduPartitionSchema.getHashPartitionSchemaList() != null && kuduPartitionSchema.getHashPartitionSchemaList().size() != 0) { generateHashPartition(tableOptions, kuduPartitionSchema.getHashPartitionSchemaList()); } if (kuduPartitionSchema.getRangePartitionSchema() != null) { generateRangePartition(schema, tableOptions, kuduPartitionSchema.getRangePartitionSchema()); } return tableOptions; }
沒有hbase編程便捷。。不過對於kudu的連接而言,只需要配置kudu master的地址,便可創建連接。
public KuduClient getKuduClient(Table table){ if(null == kuduClient){ try{ String kuduMaster = table.getStorageClusterKvs().get(MetadataConfigKey.CLUSTER_KUDU_MASTER); kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build(); }catch(Exception e){ throw new MetadataRuntimeException(e, " create kuduClient error!!"); } } return kuduClient; }
活兒干不完啊~改天再深入完 哈哈~