Kudu:
針對 Apache Hadoop 平台而開發的列式存儲管理器。
使用場景:
適用於那些既有隨機訪問,也有批量數據掃描的復合場景。
高計算量的場景。
使用了高性能的存儲設備,包括使用更多的內存。
支持數據更新,避免數據反復遷移。
支持跨地域的實時數據備份和查詢。
kudu的關鍵機制:
1.模仿數據庫,以二維表的形式組織數據,創建表的時候需要指定schema。所以只支持結構化數據。
2.每個表指定一個或多個主鍵。
3.支持insert/update/delete,這些修改操作全部要指定主鍵。
4.read操作,只支持scan原語。
5.一致性模型,默認支持snapshot ,這個可以保證scan和單個客戶端 read-you-writes一致性保證。更強的一致性保證,提供manually propagate timestamps between clients或者commit-wait。
6.cluster類似hbase簡單的M-S結構,master支持備份。
7.單個表支持水平分割,partitions叫tablets,單行一定在一個tablets里面,支持范圍,以及list等更靈活的分區鍵。
8.使用Raft 協議,可以根據SLA指定備份塊數量。
9.列式存儲
10.delta flushes,數據先更新到內存中,最后在合並到最終存儲中,有專門到后台進程負責。
11.Lazy Materialization ,對一些選擇性謂詞,可以幫助跳過很多不必要的數據。
12.支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。
注意:
1、建表的時候要求所有的tserver節點都活着
2、根據raft機制,允許(replication的副本數-)/ 2宕掉,集群還會正常運行,否則會報錯找不到ip:7050(7050是rpc的通信端口號),需要注意一個問題,第一次運行的時候要保證集群處於正常狀態下,也就是所有的服務都啟動,如果運行過程中,允許(replication的副本數-)/ 2宕掉
3、讀操作,只要有一台活着的情況下,就可以運行
maven 依賴:
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>1.7.0</version> </dependency>
Java 代碼:
import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.apache.kudu.spark.kudu.KuduContext; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * @Author:Xavier * @Data:2019-02-22 09:25 **/ public class KuduOption { // master地址 private static final String KUDU_MASTER = "nn02:7051"; private static String tableName = "KuduTest"; //創建表 @Test public void CreateTab() { // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { // 設置表的schema(模式) List<ColumnSchema> columns = new ArrayList(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()); Schema schema = new Schema(columns); //創建表時提供的所有選項 CreateTableOptions options = new CreateTableOptions(); // 設置表的replica備份和分區規則 List<String> rangeKeys = new ArrayList<>(); rangeKeys.add("key"); // 一個replica options.setNumReplicas(1); // 用列rangeKeys做為分區的參照 options.setRangePartitionColumns(rangeKeys); client.createTable(tableName, schema, options); // 添加key的hash分區 //options.addHashPartitions(parcols, 3); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //向表內插入新數據 @Test public void InsertData() { // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { // 打開表 KuduTable table = client.openTable(tableName); // 創建寫session,kudu必須通過session寫入 KuduSession session = client.newSession(); // 采取Flush方式 手動刷新 session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); session.setMutationBufferSpace(3000); System.out.println("-------start--------" + System.currentTimeMillis()); for (int i = 1; i < 6100; i++) { Insert insert = table.newInsert(); // 設置字段內容 PartialRow row = insert.getRow(); row.addString("key", i+""); row.addString(1, "value"+i); session.flush(); session.apply(insert); } System.out.println("-------end--------" + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //更新數據 @Test public void kuduUpdateTest() { // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); Update update = table.newUpdate(); PartialRow row = update.getRow(); // row.addString("key", 998 + ""); row.addString("value", "updata Data " + 10); session.flush(); session.apply(update); // System.out.print(operationResponse.getRowError()); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //根據主鍵刪除數據 @Test public void deleteData(){ KuduClient client=new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table=client.openTable(tableName); KuduSession session=client.newSession(); Delete delete=table.newDelete(); PartialRow row=delete.getRow(); row.addString("key","992"); session.apply(delete); } catch (KuduException e) { e.printStackTrace(); } } //掃描數據 @Test public void SearchData() { // 創建kudu的數據庫鏈接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); List<String> projectColumns = new ArrayList<>(1); projectColumns.add("value"); KuduScanner scanner = client.newScannerBuilder(table) .setProjectedColumnNames(projectColumns) .build(); while (scanner.hasMoreRows()) { RowResultIterator results = scanner.nextRows(); while (results.hasNext()) { RowResult result = results.next(); System.out.println(result.getString(0)); } } } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //條件掃描數據 @Test public void searchDataByCondition(){ KuduClient client =new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table=client.openTable(tableName); KuduScanner.KuduScannerBuilder scannerBuilder=client.newScannerBuilder(table); //設置搜索的條件 KuduPredicate predicate=KuduPredicate. newComparisonPredicate( table.getSchema().getColumn("key"),//設置要值的謂詞(字段) KuduPredicate.ComparisonOp.EQUAL,//設置搜索邏輯 "991");//設置搜索條件值 scannerBuilder.addPredicate(predicate); // 開始掃描 KuduScanner scanner=scannerBuilder.build(); while(scanner.hasMoreRows()){ RowResultIterator iterator=scanner.nextRows(); while(iterator.hasNext()){ RowResult result=iterator.next(); System.out.println("輸出: "+result.getString(0)+"--"+result.getString("value")); } } } catch (KuduException e) { e.printStackTrace(); } } //刪除表 @Test public void DelTab() { KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { client.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } // @Test public void searchBysparkSql() { SparkSession sparkSession = getSparkSession(); List<StructField> fields = Arrays.asList( DataTypes.createStructField("key", DataTypes.StringType, true), DataTypes.createStructField("value", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu"). schema(schema).option("kudu.master", "nn02:7051").option("kudu.table", "KuduTest").load(); ds.registerTempTable("abc"); sparkSession.sql("select * from abc").show(); } @Test public void checkTableExistByKuduContext() { SparkSession sparkSession = getSparkSession(); KuduContext context = new KuduContext("172.19.224.213:7051", sparkSession.sparkContext()); System.out.println(tableName + " is exist = " + context.tableExists(tableName)); } public SparkSession getSparkSession() { SparkConf conf = new SparkConf().setAppName("test") .setMaster("local[*]") .set("spark.driver.userClassPathFirst", "true"); conf.set("spark.sql.crossJoin.enabled", "true"); SparkContext sparkContext = new SparkContext(conf); SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate(); return sparkSession; } }