kudu基礎入門


1、kudu介紹

1.1 背景介紹

在KUDU之前,大數據主要以兩種方式存儲;

(1)靜態數據:

以 HDFS 引擎作為存儲引擎,適用於高吞吐量的離線大數據分析場景。這類存儲的局限性是數據無法進行隨機的讀寫。

(2)動態數據:

以 HBase、Cassandra 作為存儲引擎,適用於大數據隨機讀寫場景。這類存儲的局限性是批量讀取吞吐量遠不如 HDFS,不適用於批量數據分析的場景。

從上面分析可知,這兩種數據在存儲方式上完全不同,進而導致使用場景完全不同,但在真實的場景中,邊界可能沒有那么清晰,面對既需要隨機讀寫,又需要批量分析的大數據場景,該如何選擇呢?這個場景中,單種存儲引擎無法滿足業務需求,我們需要通過多種大數據工具組合來滿足這一需求。

   

如上圖所示,數據實時寫入 HBase,實時的數據更新也在 HBase 完成,為了應對 OLAP 需求,我們定時(通常是 T+1 或者 T+H)將 HBase 數據寫成靜態的文件(如:Parquet)導入到 OLAP 引擎(如:HDFS)。這一架構能滿足既需要隨機讀寫,又可以支持 OLAP 分析的場景,但他有如下缺點:

(1)架構復雜。從架構上看,數據在HBase、消息隊列、HDFS 間流轉,涉及環節太多,運維成本很高。並且每個環節需要保證高可用,都需要維護多個副本,存儲空間也有一定的浪費。最后數據在多個系統上,對數據安全策略、監控等都提出了挑戰。

(2)時效性低。數據從HBase導出成靜態文件是周期性的,一般這個周期是一天(或一小時),在時效性上不是很高。

(3)難以應對后續的更新。真實場景中,總會有數據是延遲到達的。如果這些數據之前已經從HBase導出到HDFS,新到的變更數據就難以處理了,一個方案是把原有數據應用上新的變更后重寫一遍,但這代價又很高。

為了解決上述架構的這些問題,KUDU應運而生。KUDU的定位是Fast Analytics on Fast Data,是一個既支持隨機讀寫、又支持 OLAP 分析的大數據存儲引擎。

   

   

從上圖可以看出,KUDU 是一個折中的產品,在 HDFS 和 HBase 這兩個偏科生中平衡了隨機讀寫和批量分析的性能。從 KUDU 的誕生可以說明一個觀點:底層的技術發展很多時候都是上層的業務推動的,脫離業務的技術很可能是空中樓閣。

1.2 kudu是什么

Apache Kudu是由Cloudera開源的存儲引擎,可以同時提供低延遲的隨機讀寫和高效的數據分析能力。它是一個融合HDFS和HBase的功能的新組件,具備介於兩者之間的新存儲組件。

Kudu支持水平擴展,並且與Cloudera Impala和Apache Spark等當前流行的大數據查詢和分析工具結合緊密。

1.3 kudu的應用場景

Strong performance for both scan and random access to help customers simplify complex hybrid architectures(適用於那些既有隨機訪問,也有批量數據掃描的復合場景)

High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高計算量的場景)

High IO efficiency in order to leverage modern persistent storage(使用了高性能的存儲設備,包括使用更多的內存)

The ability to update data in place, to avoid extraneous processing and data movement(支持數據更新,避免數據反復遷移)

The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的實時數據備份和查詢)

國內使用的kudu一些案例可以查看《構建近實時分析系統.pdf》文檔。

2、Kudu的架構

與HDFS和HBase相似,Kudu使用單個的Master節點,用來管理集群的元數據,並且使用任意數量的Tablet Server(可對比理解HBase中的RegionServer角色)節點用來存儲實際數據。可以部署多個Master節點來提高容錯性。一個table表的數據,被分割成1個或多個Tablet,Tablet被部署在Tablet Server來提供數據讀寫服務。

下面是一些基本概念:

Master:集群中的老大,負責集群管理、元數據管理等功能

Tablet Server:集群中的小弟,負責數據存儲,並提供數據讀寫服務

一個 tablet server 存儲了table表的tablet 和為 tablet 向 client 提供服務。對於給定的 tablet,一個tablet server 充當 leader,其他 tablet server 充當該 tablet 的 follower 副本。

只有 leader服務寫請求,然而 leader 或 followers 為每個服務提供讀請求 。一個 tablet server 可以服務多個 tablets ,並且一個 tablet 可以被多個 tablet servers 服務着。

Table(表)

一張talbe是數據存儲在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主鍵)。table 被分成稱為 tablets 的 segments。

Tablet

一個 tablet 是一張 table連續的segment,tablet是kudu表的水平分區,類似於google Bigtable的tablet,或者HBase的region。每個tablet存儲着一定連續range的數據(key),且tablet兩兩間的range不會重疊。一張表的所有tablet包含了這張表的所有key空間。與其它數據存儲引擎或關系型數據庫中的 partition(分區)相似。給定的tablet 冗余到多個 tablet 服務器上,並且在任何給定的時間點,其中一個副本被認為是leader tablet。任何副本都可以對讀取進行服務,並且寫入時需要在為 tablet 服務的一組 tablet server之間達成一致性。

3、java代碼操作kudu

3.1 構建maven工程,導入依賴

<dependencies>

<dependency>

<groupId>org.apache.kudu</groupId>

<artifactId>kudu-client</artifactId>

<version>1.6.0</version>

</dependency>

   

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.12</version>

</dependency>

</dependencies>

  

3.2 初始化方法

public class TestKudu {

//定義KuduClient客戶端對象

private static KuduClient kuduClient;

//定義表名

private static String tableName="person";

   

/**

* 初始化方法

*/

@Before

public void init(){

//指定master地址

String masterAddress="node1,node2,node3";

//創建kudu的數據庫連接

kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();

   

}

   

//構建表schema的字段信息

//字段名稱 數據類型 是否為主鍵

public ColumnSchema newColumn(String name, Type type, boolean isKey){

ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);

column.key(isKey);

return column.build();

}

}

   

3.3 創建表

/** 使用junit進行測試

*

* 創建表

* @throws KuduException

*/

@Test

public void createTable() throws KuduException {

//設置表的schema

List<ColumnSchema> columns = new LinkedList<ColumnSchema>();

columns.add(newColumn("CompanyId", Type.INT32, true));

columns.add(newColumn("WorkId", Type.INT32, false));

columns.add(newColumn("Name", Type.STRING, false));

columns.add(newColumn("Gender", Type.STRING, false));

columns.add(newColumn("Photo", Type.STRING, false));

   

Schema schema = new Schema(columns);

   

//創建表時提供的所有選項

CreateTableOptions tableOptions = new CreateTableOptions();

//設置表的副本和分區規則

LinkedList<String> list = new LinkedList<String>();

list.add("CompanyId");

//設置表副本數

tableOptions.setNumReplicas(1);

//設置range分區

//tableOptions.setRangePartitionColumns(list);

//設置hash分區和分區的數量

tableOptions.addHashPartitions(list,3);

   

try {

kuduClient.createTable("person",schema,tableOptions);

} catch (Exception e) {

e.printStackTrace();

}

   

kuduClient.close();

   

}

3.4 插入數據

/**

* 向表中加載數據

* @throws KuduException

*/

@Test

public void loadData() throws KuduException {

//打開表

KuduTable kuduTable = kuduClient.openTable(tableName);

   

//創建KuduSession對象 kudu必須通過KuduSession寫入數據

KuduSession kuduSession = kuduClient.newSession();

   

//采用flush方式 手動刷新

kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);

kuduSession.setMutationBufferSpace(3000);

   

//准備數據

for(int i=1; i<=10; i++){

Insert insert = kuduTable.newInsert();

//設置字段的內容

insert.getRow().addInt("CompanyId",i);

insert.getRow().addInt("WorkId",i);

insert.getRow().addString("Name","lisi"+i);

insert.getRow().addString("Gender","male");

insert.getRow().addString("Photo","person"+i);

   

kuduSession.flush();

kuduSession.apply(insert);

}

   

kuduSession.close();

kuduClient.close();

   

   

   

}

3.5 查詢數據

/**

* 查詢表數據

* @throws KuduException

*/

@Test

public void queryData() throws KuduException {

//打開表

KuduTable kuduTable = kuduClient.openTable(tableName);

//獲取scanner掃描器

KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);

KuduScanner scanner = scannerBuilder.build();

   

//遍歷

while(scanner.hasMoreRows()){

RowResultIterator rowResults = scanner.nextRows();

while (rowResults.hasNext()){

RowResult result = rowResults.next();

int companyId = result.getInt("CompanyId");

int workId = result.getInt("WorkId");

String name = result.getString("Name");

String gender = result.getString("Gender");

String photo = result.getString("Photo");

System.out.print("companyId:"+companyId+" ");

System.out.print("workId:"+workId+" ");

System.out.print("name:"+name+" ");

System.out.print("gender:"+gender+" ");

System.out.println("photo:"+photo);

}

}

   

//關閉

scanner.close();

kuduClient.close();

   

}

   

3.6 修改數據

/**

* 修改數據

* @throws KuduException

*/

@Test

public void updateData() throws KuduException {

//打開表

KuduTable kuduTable = kuduClient.openTable(tableName);

   

//構建kuduSession對象

KuduSession kuduSession = kuduClient.newSession();

//設置刷新數據模式,自動提交

kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);

   

//更新數據需要獲取Update對象

Update update = kuduTable.newUpdate();

//獲取row對象

PartialRow row = update.getRow();

//設置要更新的數據信息

row.addInt("CompanyId",1);

row.addString("Name","kobe");

//操作這個update對象

kuduSession.apply(update);

   

kuduSession.close();

   

}

   

3.7 刪除數據

/**

* 刪除表中的數據

*/

@Test

public void deleteData() throws KuduException {

//打開表

KuduTable kuduTable = kuduClient.openTable(tableName);

KuduSession kuduSession = kuduClient.newSession();

//獲取Delete對象

Delete delete = kuduTable.newDelete();

//構建要刪除的行對象

PartialRow row = delete.getRow();

//設置刪除數據的條件

row.addInt("CompanyId",2);

kuduSession.flush();

kuduSession.apply(delete);

kuduSession.close();

kuduClient.close();

}

   

3.8 刪除表

/**

* 刪除表

*/

@Test

public void dropTable() throws KuduException {

//刪除表

DeleteTableResponse response = kuduClient.deleteTable(tableName);

//關閉客戶端連接

kuduClient.close();

}

   

3.9 kudu的分區方式

為了提供可擴展性,Kudu 表被划分為稱為 tablets 的單元,並分布在許多 tablet servers 上。行總是屬於單個tablet 。將行分配給 tablet 的方法由在表創建期間設置的表的分區決定。 kudu提供了3種分區方式。

3.9.1 Range Partitioning ( 范圍分區 )

范圍分區可以根據存入數據的數據量,均衡的存儲到各個機器上,防止機器出現負載不均衡現象.

/**

* 測試分區:

* RangePartition

*/

@Test

public void testRangePartition() throws KuduException {

//設置表的schema

LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();

columnSchemas.add(newColumn("CompanyId", Type.INT32,true));

columnSchemas.add(newColumn("WorkId", Type.INT32,false));

columnSchemas.add(newColumn("Name", Type.STRING,false));

columnSchemas.add(newColumn("Gender", Type.STRING,false));

columnSchemas.add(newColumn("Photo", Type.STRING,false));

   

//創建schema

Schema schema = new Schema(columnSchemas);

   

//創建表時提供的所有選項

CreateTableOptions tableOptions = new CreateTableOptions();

//設置副本數

tableOptions.setNumReplicas(1);

//設置范圍分區的規則

LinkedList<String> parcols = new LinkedList<String>();

parcols.add("CompanyId");

//設置按照那個字段進行range分區

tableOptions.setRangePartitionColumns(parcols);

   

/**

* range

* 0 < value < 10

* 10 <= value < 20

* 20 <= value < 30

* ........

* 80 <= value < 90

* */

int count=0;

for(int i =0;i<10;i++){

//范圍開始

PartialRow lower = schema.newPartialRow();

lower.addInt("CompanyId",count);

   

//范圍結束

PartialRow upper = schema.newPartialRow();

count +=10;

upper.addInt("CompanyId",count);

   

//設置每一個分區的范圍

tableOptions.addRangePartition(lower,upper);

}

   

try {

kuduClient.createTable("student",schema,tableOptions);

} catch (KuduException e) {

e.printStackTrace();

}

kuduClient.close();

   

   

}

   

3.9.2 Hash Partitioning ( 哈希分區 )

哈希分區通過哈希值將行分配到許多 buckets ( 存儲桶 )之一; 哈希分區是一種有效的策略,當不需要對表進行有序訪問時。哈希分區對於在 tablet 之間隨機散布這些功能是有效的,這有助於減輕熱點和 tablet 大小不均勻。

/**

* 測試分區:

* hash分區

*/

@Test

public void testHashPartition() throws KuduException {

//設置表的schema

LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();

columnSchemas.add(newColumn("CompanyId", Type.INT32,true));

columnSchemas.add(newColumn("WorkId", Type.INT32,false));

columnSchemas.add(newColumn("Name", Type.STRING,false));

columnSchemas.add(newColumn("Gender", Type.STRING,false));

columnSchemas.add(newColumn("Photo", Type.STRING,false));

   

//創建schema

Schema schema = new Schema(columnSchemas);

   

//創建表時提供的所有選項

CreateTableOptions tableOptions = new CreateTableOptions();

//設置副本數

tableOptions.setNumReplicas(1);

//設置范圍分區的規則

LinkedList<String> parcols = new LinkedList<String>();

parcols.add("CompanyId");

//設置按照那個字段進行range分區

tableOptions.addHashPartitions(parcols,6);

try {

kuduClient.createTable("dog",schema,tableOptions);

} catch (KuduException e) {

e.printStackTrace();

}

   

kuduClient.close();

}

   

3.9.3 Multilevel Partitioning ( 多級分區 )

Kudu 允許一個表在單個表上組合多級分區。 當正確使用時,多級分區可以保留各個分區類型的優點,同時減少每個分區的缺點 需求.

/**

* 測試分區:

* 多級分區

* Multilevel Partition

* 混合使用hash分區和range分區

*

* 哈希分區有利於提高寫入數據的吞吐量,而范圍分區可以避免tablet無限增長問題,

* hash分區和range分區結合,可以極大的提升kudu的性能

*/

@Test

public void testMultilevelPartition() throws KuduException {

//設置表的schema

LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();

columnSchemas.add(newColumn("CompanyId", Type.INT32,true));

columnSchemas.add(newColumn("WorkId", Type.INT32,false));

columnSchemas.add(newColumn("Name", Type.STRING,false));

columnSchemas.add(newColumn("Gender", Type.STRING,false));

columnSchemas.add(newColumn("Photo", Type.STRING,false));

   

//創建schema

Schema schema = new Schema(columnSchemas);

//創建表時提供的所有選項

CreateTableOptions tableOptions = new CreateTableOptions();

//設置副本數

tableOptions.setNumReplicas(1);

//設置范圍分區的規則

LinkedList<String> parcols = new LinkedList<String>();

parcols.add("CompanyId");

   

//hash分區

tableOptions.addHashPartitions(parcols,5);

   

//range分區

int count=0;

for(int i=0;i<10;i++){

PartialRow lower = schema.newPartialRow();

lower.addInt("CompanyId",count);

count+=10;

   

PartialRow upper = schema.newPartialRow();

upper.addInt("CompanyId",count);

tableOptions.addRangePartition(lower,upper);

}

   

try {

kuduClient.createTable("cat",schema,tableOptions);

} catch (KuduException e) {

e.printStackTrace();

}

kuduClient.close();

   

   

}

4、spark操作kudu

Spark與KUDU集成支持:

  • DDL操作(創建/刪除)
  • 本地Kudu RDD
  • Native Kudu數據源,用於DataFrame集成
  • 從kudu讀取數據
  • 從Kudu執行插入/更新/ upsert /刪除
  • 謂詞下推
  • Kudu和Spark SQL之間的模式映射

到目前為止,我們已經聽說過幾個上下文,例如SparkContext,SQLContext,HiveContext, SparkSession,現在,我們將使用Kudu引入一個KuduContext。這是可在Spark應用程序中廣播的主要可序列化對象。此類代表在Spark執行程序中與Kudu Java客戶端進行交互。 KuduContext提供執行DDL操作所需的方法,與本機Kudu RDD的接口,對數據執行更新/插入/刪除,將數據類型從Kudu轉換為Spark等。

4.1 引入依賴

<repositories>

<repository>

<id>cloudera</id>

<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

</repository>

</repositories>

   

<dependencies>

                <dependency>

<groupId>org.apache.kudu</groupId>

<artifactId>kudu-client-tools</artifactId>

<version>1.6.0-cdh5.14.0</version>

</dependency>

   

<dependency>

<groupId>org.apache.kudu</groupId>

<artifactId>kudu-client</artifactId>

<version>1.6.0-cdh5.14.0</version>

</dependency>

   

<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->

<dependency>

<groupId>org.apache.kudu</groupId>

<artifactId>kudu-spark2_2.11</artifactId>

<version>1.6.0-cdh5.14.0</version>

</dependency>

   

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.1.0</version>

</dependency>

</dependencies>

   

4.2 創建表

  • 定義kudu的表需要分成5個步驟:

    1:提供表名

    2:提供schema

    3:提供主鍵

    4:定義重要選項;例如:定義分區的schema

    5:調用create Table api

  • 代碼開發

object SparkKuduTest {

def main(args: Array[String]): Unit = {

//構建sparkConf對象

val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")

   

//構建SparkSession對象

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

   

//獲取sparkContext對象

val sc: SparkContext = sparkSession.sparkContext

sc.setLogLevel("warn")

 

//構建KuduContext對象

val kuduContext = new KuduContext("node1:7051,node2:7051,node3:7051",sc)

   

//1.創建表操作

createTable(kuduContext)

 

/**

* 創建表

* @param kuduContext

* @return

*/

private def createTable(kuduContext: KuduContext) = {

   

//1.1定義表名

val tableName = "spark_kudu"

   

//1.2 定義表的schema

val schema = StructType(

StructField("userId", StringType, false) ::

StructField("name", StringType, false) ::

StructField("age", IntegerType, false) ::

StructField("sex", StringType, false) :: Nil)

   

//1.3 定義表的主鍵

val primaryKey = Seq("userId")

   

//1.4 定義分區的schema

val options = new CreateTableOptions

//設置分區

options.setRangePartitionColumns(List("userId").asJava)

//設置副本

options.setNumReplicas(1)

   

//1.5 創建表

if(!kuduContext.tableExists(tableName)){

kuduContext.createTable(tableName, schema, primaryKey, options)

}

   

}

   

}

 

定義表時要注意的是Kudu表選項值。你會注意到在指定組成范圍分區列的列名列表時我們調用"asJava"方 法。這是因為在這里,我們調用了Kudu Java客戶端本身,它需要Java對象(即java.util.List)而不是Scala的List對 象;(要使"asJava"方法可用,請記住導入JavaConverters庫。) 創建表后,通過將瀏覽器指向http// master主機名:8051/tables

來查看Kudu主UI可以找到創建的表,通過單擊表ID,能夠看到表模式和分區信息。

   

點擊Table id 可以觀察到表的schema等信息:

   

4.3 dataFrame操作kudu

4.3.1 DML操作

Kudu支持許多DML類型的操作,其中一些操作包含在Spark on Kudu集成. 包括:

  • INSERT - 將DataFrame的行插入Kudu表。請注意,雖然API完全支持INSERT,但不鼓勵在Spark中使用它。 使用INSERT是有風險的,因為Spark任務可能需要重新執行,這意味着可能要求再次插入已插入的行。這樣 做會導致失敗,因為如果行已經存在,INSERT將不允許插入行(導致失敗)。相反,我們鼓勵使用下面描述 的INSERT_IGNORE。
  • INSERT-IGNORE - 將DataFrame的行插入Kudu表。如果表存在,則忽略插入動作。
  • DELETE - 從Kudu表中刪除DataFrame中的行
  • UPSERT - 如果存在,則在Kudu表中更新DataFrame中的行,否則執行插入操作。
  • UPDATE - 更新dataframe中的行
4.3.1.1 插入數據insert操作
  • 1、先創建一張表,然后把數據插入到表中

case class People(id:Int,name:String,age:Int)
object DataFrameKudu {
def main(args: Array[String]): Unit = {
//
構建SparkConf對象
val sparkConf: SparkConf = new SparkConf().setAppName("DataFrameKudu").setMaster("local[2]")
//
構建SparkSession對象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//
獲取SparkContext對象
val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("warn")
//
指定kudumaster地址
val kuduMaster="node1:7051,node2:7051,node3:7051"
//
構建KuduContext對象
val kuduContext = new KuduContext(kuduMaster,sc)

//
定義表名
val tableName="people"
//1
、創建表
createTable(kuduContext, tableName)

//2
、插入數據到表中
insertData2table(sparkSession,sc, kuduContext, tableName)

}

/**
*
創建表
* @param kuduContext
* @param tableName
*/
private def createTable(kuduContext: KuduContext, tableName: String): Unit = {
//
定義表的schema
val schema = StructType(
StructField("id", IntegerType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil
)

//
定義表的主鍵
val tablePrimaryKey = List("id")

//
定義表的選項配置
val options = new CreateTableOptions
options.setRangePartitionColumns(List("id").asJava)
options.setNumReplicas(1)

//
創建表
if (!kuduContext.tableExists(tableName)) {
kuduContext.createTable(tableName, schema, tablePrimaryKey, options)
}
}

/**
*
插入數據到表中
* @param sparkSession
* @param sc
* @param kuduContext
* @param tableName
*/
private def insertData2table(sparkSession:SparkSession,sc: SparkContext, kuduContext: KuduContext, tableName: String): Unit = {
//
准備數據
val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40))
val peopleRDD: RDD[People] = sc.parallelize(data)
import sparkSession.implicits._
val peopleDF: DataFrame = peopleRDD.toDF
kuduContext.insertRows(peopleDF, tableName)



}

}

 

4.3.1.2 刪除數據delete操作

/**

* 刪除表的數據

* @param sparkSession

* @param sc

* @param kuduMaster

* @param kuduContext

* @param tableName

*/

private def deleteData(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {

//定義一個map集合,封裝kudu的相關信息

val options = Map(

"kudu.master" -> kuduMaster,

"kudu.table" -> tableName

)

   

import sparkSession.implicits._

val data = List(People(1, "zhangsan", 20), People(2, "lisi", 30), People(3, "wangwu", 40))

val dataFrame: DataFrame = sc.parallelize(data).toDF

dataFrame.createTempView("temp")

//獲取年齡大於30的所有用戶id

val result: DataFrame = sparkSession.sql("select id from temp where age >30")

//刪除對應的數據,這里必須要是主鍵字段

kuduContext.deleteRows(result, tableName)

   

}

   

5.3.1.3 更新數據upsert操作

/**

* 更新數據--添加數據

*

* @param sc

* @param kuduMaster

* @param kuduContext

* @param tableName

*/

private def UpsertData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {

//更新表中的數據

//定義一個map集合,封裝kudu的相關信息

val options = Map(

"kudu.master" -> kuduMaster,

"kudu.table" -> tableName

)

   

import sparkSession.implicits._

val data = List(People(1, "zhangsan", 50), People(5, "tom", 30))

val dataFrame: DataFrame = sc.parallelize(data).toDF

//如果存在就是更新,否則就是插入

kuduContext.upsertRows(dataFrame, tableName)

   

}

   

43.1.4 更新數據update操作

/**

* 更新數據

* @param sparkSession

* @param sc

* @param kuduMaster

* @param kuduContext

* @param tableName

*/

private def updateData(sparkSession: SparkSession,sc: SparkContext, kuduMaster: String, kuduContext: KuduContext, tableName: String): Unit = {

//定義一個map集合,封裝kudu的相關信息

val options = Map(

"kudu.master" -> kuduMaster,

"kudu.table" -> tableName

)

   

import sparkSession.implicits._

val data = List(People(1, "zhangsan", 60), People(6, "tom", 30))

val dataFrame: DataFrame = sc.parallelize(data).toDF

//如果存在就是更新,否則就是報錯

kuduContext.updateRows(dataFrame, tableName)

   

}

4.3.2 DataFrameApi讀取kudu表中的數據

雖然我們可以通過上面顯示的KuduContext執行大量操作,但我們還可以直接從默認數據源本身調用讀/寫API。要設置讀取,我們需要為Kudu表指定選項,命名我們要讀取的表以及為表提供服務的Kudu集群的Kudu主服務器列表。

  • 代碼示例

/**

* 使用DataFrameApi讀取kudu表中的數據

* @param sparkSession

* @param kuduMaster

* @param tableName

*/

private def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {

//定義map集合,封裝kudumaster地址和要讀取的表名

val options = Map(

"kudu.master" -> kuduMaster,

"kudu.table" -> tableName

)

sparkSession.read.options(options).kudu.show()

   

}

   

4.3.3 DataFrameApi寫數據到kudu表中

在通過DataFrame API編寫時,目前只支持一種模式"append"。尚未實現的"覆蓋"模式。

  • 代碼示例

/**

* DataFrame api 寫數據到kudu

* @param sparkSession

* @param sc

* @param kuduMaster

* @param tableName

*/

private def dataFrame2kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {

//定義map集合,封裝kudumaster地址和要讀取的表名

val options = Map(

"kudu.master" -> kuduMaster,

"kudu.table" -> tableName

)

val data = List(People(7, "jim", 30), People(8, "xiaoming", 40))

import sparkSession.implicits._

val dataFrame: DataFrame = sc.parallelize(data).toDF

//dataFrame結果寫入到kudu表中 ,目前只支持append追加

dataFrame.write.options(options).mode("append").kudu

   

//查看結果

//導包

import org.apache.kudu.spark.kudu._

//加載表的數據,導包調用kudu方法,轉換為dataFrame,最后在使用show方法顯示結果

sparkSession.read.options(options).kudu.show()

}

4.3.4 使用sparksql操作kudu表

可以選擇使用Spark SQL直接使用INSERT語句寫入Kudu表;與'append'類似,INSERT語句實際上將默認使用 UPSERT語義處理;

  • 代碼示例

/**

* 使用sparksql操作kudu

* @param sparkSession

* @param sc

* @param kuduMaster

* @param tableName

*/

private def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {

//定義map集合,封裝kudumaster地址和表名

val options = Map(

"kudu.master" -> kuduMaster,

"kudu.table" -> tableName

)

val data = List(People(10, "小張", 30), People(11, "小王", 40))

import sparkSession.implicits._

val dataFrame: DataFrame = sc.parallelize(data).toDF

//dataFrame注冊成一張表

dataFrame.createTempView("temp1")

   

//獲取kudu表中的數據,然后注冊成一張表

sparkSession.read.options(options).kudu.createTempView("temp2")

//使用sparkSQLinsert操作插入數據

sparkSession.sql("insert into table temp2 select * from temp1")

sparkSession.sql("select * from temp2 where age >30").show()

   

}

   

4.4 Kudu Native RDD

Spark與Kudu的集成同時提供了kudu RDD.

  • 代碼示例

//使用kuduContext對象調用kuduRDD方法,需要sparkContext對象,表名,想要的字段名稱

val kuduRDD: RDD[Row] = kuduContext.kuduRDD(sc,tableName,Seq("name","age"))

//操作該rdd 打印輸出

val result: RDD[(String, Int)] = kuduRDD.map {

case Row(name: String, age: Int) => (name, age)

}

result.foreach(println)

5、kudu集成impala

5.1 impala基本介紹

impala是cloudera提供的一款高效率的sql查詢工具,提供實時的查詢效果,官方測試性能比hive快10到100倍,其sql查詢比sparkSQL還要更加快速,號稱是當前大數據領域最快的查詢sql工具,

impala是參照谷歌的新三篇論文(Caffeine--網絡搜索引擎、Pregel--分布式圖計算、Dremel--交互式分析工具)當中的Dremel實現而來,其中舊三篇論文分別是(BigTable,GFS,MapReduce)分別對應我們即將學的HBase和已經學過的HDFS以及MapReduce。

impala是基於hive並使用內存進行計算,兼顧數據倉庫,具有實時,批處理,多並發等優點

Kudu與Apache Impala (孵化)緊密集成,impala天然就支持兼容kudu,允許開發人員使用Impala的SQL語法從Kudu的tablets 插入,查詢,更新和刪除數據;

5.2 impala的架構以及查詢計划

   

   

  • Impalad
    • 基本是每個DataNode上都會啟動一個Impalad進程,Impalad主要扮演兩個角色:
      • Coordinator:
        • 負責接收客戶端發來的查詢,解析查詢,構建查詢計划
        • 把查詢子任務分發給很多Executor,收集Executor返回的結果,組合后返回給客戶端
        • 對於客戶端發送來的DDL,提交給Catalogd處理
      • Executor:
        • 執行查詢子任務,將子任務結果返回給Coordinator
  • Catalogd
    • 整個集群只有一個Catalogd,負責所有元數據的更新和獲取
  • StateStored
    • 整個集群只有一個Statestored,作為集群的訂閱中心,負責集群不同組件的信息同步
    • 跟蹤集群中的Impalad的健康狀態及位置信息,由statestored進程表示,它通過創建多個線程來處理Impalad的注冊訂閱和與各Impalad保持心跳連接,各Impalad都會緩存一份State Store中的信息,當State Store離線后(Impalad發現State Store處於離線時,會進入recovery模式,反復注冊,當State Store重新加入集群后,自動恢復正常,更新緩存數據)因為Impalad有State Store的緩存仍然可以工作,但會因為有些Impalad失效了,而已緩存數據無法更新,導致把執行計划分配給了失效的Impalad,導致查詢失敗。

6、使用impala操作kudu整合

1、需要先啟動hdfs、hive、kudu、impala

2、使用impala的shell控制台

  • 執行命令impala-shell

(1):使用該impala-shell命令啟動Impala Shell 。默認情況下,impala-shell 嘗試連接到localhost端口21000 上的Impala守護程序。要連接到其他主機,請使用該-i <host:port>選項。要自動連接到特定的Impala數據庫,請使用該-d <database>選項。例如,如果您的所有Kudu表都位於數據庫中的Impala中impala_kudu,則-d impala_kudu可以使用此數據庫。
(2):要退出Impala Shell,請使用以下命令: quit;

6.1 創建kudu表

使用Impala創建新的Kudu表時,可以將該表創建為內部表或外部表。

6.1.1 內部表

內部表由Impala管理,當您從Impala中刪除時,數據和表確實被刪除。當您使用Impala創建新表時,它通常是內部表。

  • 使用impala創建內部表:

CREATE TABLE my_first_table
(
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
'kudu.table_name' = 'my_first_table'
);


CREATE TABLE 語句中,必須首先列出構成主鍵的列。

此時創建的表是內部表,從impala刪除表的時候,在底層存儲的kudu也會刪除表。

drop table if exists my_first_table

6.1.2 外部表

外部表(創建者CREATE EXTERNAL TABLE)不受Impala管理,並且刪除此表不會將表從其源位置(此處為Kudu)丟棄。相反,它只會去除Impala和Kudu之間的映射。這是Kudu提供的用於將現有表映射到Impala的語法。

使用java創建一個kudu表:

public class CreateTable {

private static ColumnSchema newColumn(String name, Type type, boolean iskey) {

ColumnSchema.ColumnSchemaBuilder column = new

ColumnSchema.ColumnSchemaBuilder(name, type);

column.key(iskey);

return column.build();

}

public static void main(String[] args) throws KuduException {

// master地址

final String masteraddr = "node1,node2,node3";

// 創建kudu的數據庫鏈接

KuduClient client = new

KuduClient.KuduClientBuilder(masteraddr).defaultSocketReadTimeoutMs(6000).build();

 

// 設置表的schema

List<ColumnSchema> columns = new LinkedList<ColumnSchema>();

columns.add(newColumn("CompanyId", Type.INT32, true));

columns.add(newColumn("WorkId", Type.INT32, false));

columns.add(newColumn("Name", Type.STRING, false));

columns.add(newColumn("Gender", Type.STRING, false));

columns.add(newColumn("Photo", Type.STRING, false));

Schema schema = new Schema(columns);

//創建表時提供的所有選項

CreateTableOptions options = new CreateTableOptions();

 

// 設置表的replica備份和分區規則

List<String> parcols = new LinkedList<String>();

 

parcols.add("CompanyId");

//設置表的備份數

options.setNumReplicas(1);

//設置range分區

options.setRangePartitionColumns(parcols);

 

//設置hash分區和數量

options.addHashPartitions(parcols, 3);

try {

client.createTable("person", schema, options);

} catch (KuduException e) {

e.printStackTrace();

}

client.close();

}

}

   

在kudu的頁面上可以觀察到如下信息:

   

在impala的命令行查看表:

當前在impala中並沒有person這個表

使用impala創建外部表 , 將kudu的表映射到impala上:

在impala-shell執行

CREATE EXTERNAL TABLE `person` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'person',
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051')

6.2 使用impala對kudu進行DML操作

6.2.1 將數據插入 Kudu 表

impala 允許使用標准 SQL 語句將數據插入 Kudu 。

6.2.1.1 插入單個值

創建表

CREATE TABLE my_first_table
(
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

此示例插入單個行

INSERT INTO my_first_table VALUES (50, "zhangsan");

查看數據

select * from my_first_table

使用單個語句插入三行

INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");

6.2.1.2 批量插入Batch Insert

從 Impala 和 Kudu 的角度來看,通常表現最好的方法通常是使用 Impala 中的 SELECT FROM 語句導入數據

示例

INSERT INTO my_first_table
SELECT * FROM temp1;

6.2.2 更新數據

示例

UPDATE my_first_table SET name="xiaowang" where id =1 ;

6.2.3 刪除數據

示例

delete from my_first_table where id =2;

   

6.3 更改表屬性

開發人員可以通過更改表的屬性來更改 Impala 與給定 Kudu 表相關的元數據。這些屬性包括表名, Kudu 主地址列表,以及表是否由 Impala (內部)或外部管理。

6.3.1 Rename an Impala Mapping Table ( 重命名 Impala 映射表 )

ALTER TABLE PERSON RENAME TO person_temp;

   

6.3.2 Rename the underlying Kudu table for an internal table ( 重新命名內部表的基礎 Kudu 表 )

創建內部表:

CREATE TABLE kudu_student
(
CompanyId INT,
WorkId INT,
Name STRING,
Gender STRING,
Photo STRING,
PRIMARY KEY(CompanyId)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
'kudu.table_name' = 'student'
);

   

如果表是內部表,則可以通過更改 kudu.table_name 屬性重命名底層的 Kudu 表

ALTER TABLE kudu_student SET TBLPROPERTIES('kudu.table_name' = 'new_student');

效果圖

   

6.3.3 Remapping an external table to a different Kudu table ( 將外部表重新映射到不同的 Kudu 表 )

如果用戶在使用過程中發現其他應用程序重新命名了kudu表,那么此時的外部表需要重新映射到kudu上

創建一個外部表:

CREATE EXTERNAL TABLE external_table
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
'kudu.table_name' = 'person'
);

重新映射外部表,指向不同的kudu表:

ALTER TABLE external_table
SET TBLPROPERTIES('kudu.table_name' = 'hashTable')

上面的操作是:將external_table映射的PERSON表重新指向hashTable表

6.3.4 Change the Kudu Master Address ( 更改 Kudu Master 地址 )

ALTER TABLE my_table

SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-new-master.example.com:7051');

6.3.5 Change an Internally-Managed Table to External ( 將內部管理的表更改為外部 )

ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');

6.4 impala使用java語言操作kudu

對於impala而言,開發人員是可以通過JDBC連接impala的,有了JDBC,開發人員可以通過impala來間接操作 kudu;

6.4.1 引入依賴

<!--impalajdbc操作-->

         <dependency>

<groupId>com.cloudera</groupId>

<artifactId>ImpalaJDBC41</artifactId>

<version>2.5.42</version>

</dependency>

   

<!--Caused by : ClassNotFound : thrift.protocol.TPro-->

<dependency>

<groupId>org.apache.thrift</groupId>

<artifactId>libfb303</artifactId>

<version>0.9.3</version>

<type>pom</type>

</dependency>

   

<!--Caused by : ClassNotFound : thrift.protocol.TPro-->

<dependency>

<groupId>org.apache.thrift</groupId>

<artifactId>libthrift</artifactId>

<version>0.9.3</version>

<type>pom</type>

</dependency>

 

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-jdbc</artifactId>

<exclusions>

<exclusion>

<groupId>org.apache.hive</groupId>

<artifactId>hive-service-rpc</artifactId>

</exclusion>

<exclusion>

<groupId>org.apache.hive</groupId>

<artifactId>hive-service</artifactId>

</exclusion>

</exclusions>

<version>1.1.0</version>

</dependency>

   

<!--導入hive-->

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-service</artifactId>

<version>1.1.0</version>

</dependency>

  

6.4.2 通過JDBC連接impala操作kudu

使用JDBC連接impala操作kudu,與JDBC連接mysql做更重增刪改查基本一樣

6.4.2.1 創建實體類

package cn.itcast.impala.impala;

   

public class Person {

private int companyId;

private int workId;

private String name;

private String gender;

private String photo;

   

public Person(int companyId, int workId, String name, String gender, String photo) {

this.companyId = companyId;

this.workId = workId;

this.name = name;

this.gender = gender;

this.photo = photo;

}

   

public int getCompanyId() {

return companyId;

}

   

public void setCompanyId(int companyId) {

this.companyId = companyId;

}

   

public int getWorkId() {

return workId;

}

   

public void setWorkId(int workId) {

this.workId = workId;

}

   

public String getName() {

return name;

}

   

public void setName(String name) {

this.name = name;

}

   

public String getGender() {

return gender;

}

   

public void setGender(String gender) {

this.gender = gender;

}

   

public String getPhoto() {

return photo;

}

   

public void setPhoto(String photo) {

this.photo = photo;

}

   

}

6.4.2.2 JDBC連接impala對kudu進行增刪改查

package cn.itcast.impala.impala;

   

import java.sql.*;

   

public class Contants {

private static String JDBC_DRIVER="com.cloudera.impala.jdbc41.Driver";

private static String CONNECTION_URL="jdbc:impala://node1:21050/default;auth=noSasl";

//定義數據庫連接

static Connection conn=null;

//定義PreparedStatement對象

static PreparedStatement ps=null;

//定義查詢的結果集

static ResultSet rs= null;

   

   

//數據庫連接

public static Connection getConn(){

try {

Class.forName(JDBC_DRIVER);

conn=DriverManager.getConnection(CONNECTION_URL);

} catch (Exception e) {

e.printStackTrace();

}

   

return conn;

}

   

//創建一個表

public static void createTable(){

conn=getConn();

String sql="CREATE TABLE impala_kudu_test" +

"(" +

"companyId BIGINT," +

"workId BIGINT," +

"name STRING," +

"gender STRING," +

"photo STRING," +

"PRIMARY KEY(companyId)" +

")" +

"PARTITION BY HASH PARTITIONS 16 " +

"STORED AS KUDU " +

"TBLPROPERTIES (" +

"'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051'," +

"'kudu.table_name' = 'impala_kudu_test'" +

");";

   

try {

ps = conn.prepareStatement(sql);

ps.execute();

} catch (SQLException e) {

e.printStackTrace();

}

}

   

   

//查詢數據

public static ResultSet queryRows(){

try {

//定義執行的sql語句

String sql="select * from impala_kudu_test";

ps = getConn().prepareStatement(sql);

rs= ps.executeQuery();

} catch (SQLException e) {

e.printStackTrace();

}

   

return rs;

}

   

//打印結果

public static void printRows(ResultSet rs){

/**

private int companyId;

private int workId;

private String name;

private String gender;

private String photo;

*/

   

try {

while (rs.next()){

//獲取表的每一行字段信息

int companyId = rs.getInt("companyId");

int workId = rs.getInt("workId");

String name = rs.getString("name");

String gender = rs.getString("gender");

String photo = rs.getString("photo");

System.out.print("companyId:"+companyId+" ");

System.out.print("workId:"+workId+" ");

System.out.print("name:"+name+" ");

System.out.print("gender:"+gender+" ");

System.out.println("photo:"+photo);

   

}

} catch (SQLException e) {

e.printStackTrace();

}finally {

if(ps!=null){

try {

ps.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

   

if(conn !=null){

try {

conn.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

}

   

   

//插入數據

public static void insertRows(Person person){

conn=getConn();

String sql="insert into table impala_kudu_test(companyId,workId,name,gender,photo) values(?,?,?,?,?)";

   

try {

ps=conn.prepareStatement(sql);

//給占位符?賦值

ps.setInt(1,person.getCompanyId());

ps.setInt(2,person.getWorkId());

ps.setString(3,person.getName());

ps.setString(4,person.getGender());

ps.setString(5,person.getPhoto());

ps.execute();

   

} catch (SQLException e) {

e.printStackTrace();

}finally {

if(ps !=null){

try {

//關閉

ps.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

   

if(conn !=null){

try {

//關閉

conn.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

   

}

   

//更新數據

public static void updateRows(Person person){

//定義執行的sql語句

String sql="update impala_kudu_test set workId="+person.getWorkId()+

",name='"+person.getName()+"' ,"+"gender='"+person.getGender()+"' ,"+

"photo='"+person.getPhoto()+"' where companyId="+person.getCompanyId();

   

try {

ps= getConn().prepareStatement(sql);

ps.execute();

} catch (SQLException e) {

e.printStackTrace();

}finally {

if(ps !=null){

try {

//關閉

ps.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

   

if(conn !=null){

try {

//關閉

conn.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

}

   

//刪除數據

public static void deleteRows(int companyId){

 

//定義sql語句

String sql="delete from impala_kudu_test where companyId="+companyId;

try {

ps =getConn().prepareStatement(sql);

ps.execute();

} catch (SQLException e) {

e.printStackTrace();

   

}

   

}

 

//刪除表

public static void dropTable() {

String sql="drop table if exists impala_kudu_test";

try {

ps =getConn().prepareStatement(sql);

ps.execute();

} catch (SQLException e) {

e.printStackTrace();

}

}

   

}

   

6.4.2.3 代碼測試運行

package cn.itcast.impala.impala;

   

import java.sql.Connection;

   

public class ImpalaJdbcClient {

public static void main(String[] args) {

Connection conn = Contants.getConn();

   

//創建一個表

Contants.createTable();

   

//插入數據

Contants.insertRows(new Person(1,100,"lisi","male","lisi-photo"));

   

//查詢表的數據

ResultSet rs = Contants.queryRows();

Contants.printRows(rs);

   

//更新數據

Contants.updateRows(new Person(1,200,"zhangsan","male","zhangsan-photo"));

   

//刪除數據

Contants.deleteRows(1);

   

//刪除表

Contants.dropTable();

   

}

   

}

   

7、kudu原理

7.1 表與schema

Kudu設計是面向結構化存儲的,因此,Kudu的表需要用戶在建表時定義它的Schema信息,這些Schema信息包含:列定義(含類型),Primary Key定義(用戶指定的若干個列的有序組合)。數據的唯一性,依賴於用戶所提供的Primary Key中的Column組合的值的唯一性。 Kudu提供了Alter命令來增刪列,但位於Primary Key中的列是不允許刪除的。 Kudu當前並不支持二級索引。 從用戶角度來看,Kudu是一種存儲結構化數據表的存儲系統。在一個Kudu集群中可以定義任意數量的table,每個table都需要預先定義好schema。每個table的列數是確定的,每一列都需要有名字和類型,每個表中可以把其中一列或多列定義為主鍵。這么看來,Kudu更像關系型數據庫,而不是像HBase、Cassandra和MongoDB這些NoSQL數據庫。不過Kudu目前還不能像關系型數據一樣支持二級索引。

Kudu使用確定的列類型,而不是類似於NoSQL的"everything is byte"。這可以帶來兩點好處: 確定的列類型使Kudu可以進行類型特有的編碼。 可以提供 SQL-like 元數據給其他上層查詢工具,比如BI工具。

7.2 kudu的底層數據模型

Kudu的底層數據文件的存儲,未采用HDFS這樣的較高抽象層次的分布式文件系統,而是自行開發了一套可基於

Table/Tablet/Replica視圖級別的底層存儲系統。

這套實現基於如下的幾個設計目標:

• 可提供快速的列式查詢

• 可支持快速的隨機更新

• 可提供更為穩定的查詢性能保障

   

一張表會分成若干個tablet,每個tablet包括MetaData元信息及若干個RowSet,RowSet包含一個MemRowSet及若干個DiskRowSet,DiskRowSet中包含一個BloomFile、Ad_hoc Index、BaseData、DeltaMem及若干個RedoFile和UndoFile(UndoFile一般情況下只有一個)。

MemRowSet:用於新數據insert及已在MemRowSet中的數據的更新,一個MemRowSet寫滿后會將數據刷到磁盤形成若干個DiskRowSet。每次到達32M生成一個DiskRowSet。

DiskRowSet:用於老數據的變更(mutation),后台定期對DiskRowSet做compaction,以刪除沒用的數據及合並歷史數據,減少查詢過程中的IO開銷。

BloomFile:根據一個DiskRowSet中的key生成一個bloom filter,用於快速模糊定位某個key是否在DiskRowSet中存在。

Ad_hocIndex:是主鍵的索引,用於定位key在DiskRowSet中的具體哪個偏移位置。

BaseData是MemRowSet flush下來的數據,按列存儲,按主鍵有序。

UndoFile是基於BaseData之前時間的歷史數據,通過在BaseData上apply UndoFile中的記錄,可以獲得歷史數據。

RedoFile是基於BaseData之后時間的變更(mutation)記錄,通過在BaseData上apply RedoFile中的記錄,可獲得較新的數據。

DeltaMem用於DiskRowSet中數據的變更mutation,先寫到內存中,寫滿后flush到磁盤形成RedoFile。

   

   

MemRowSets可以對比理解成HBase中的MemStore, 而DiskRowSets可理解成HBase中的HFile。MemRowSets中的數據按照行試圖進行存儲,數據結構為B-Tree。

MemRowSets中的數據被Flush到磁盤之后,形成DiskRowSets。

DisRowSets中的數據,按照32MB大小為單位,按序划分為一個個的DiskRowSet。 DiskRowSet中的數據按照Column進行組織,與Parquet類似。

這是Kudu可支持一些分析性查詢的基礎。每一個Column的數據被存儲在一個相鄰的數據區域,而這個數據區域進一步被細分成一個個的小的Page單元,與HBase File中的Block類似,對每一個Column Page可采用一些Encoding算法,以及一些通用的Compression算法。 既然可對Column Page可采用Encoding以及Compression算法,那么,對單條記錄的更改就會比較困難了。

前面提到了Kudu可支持單條記錄級別的更新/刪除,是如何做到的?

   

與HBase類似,也是通過增加一條新的記錄來描述這次更新/刪除操作的。DiskRowSet是不可修改了,那么 KUDU 要如何應對數據的更新呢?在KUDU中,把DiskRowSet分為了兩部分:base data、delta stores。base data 負責存儲基礎數據,delta stores負責存儲 base data 中的變更數據.

   

如上圖所示,數據從 MemRowSet 刷到磁盤后就形成了一份 DiskRowSet(只包含 base data),每份 DiskRowSet 在內存中都會有一個對應的DeltaMemStore,負責記錄此 DiskRowSet 后續的數據變更(更新、刪除)。DeltaMemStore 內部維護一個 B-樹索引,映射到每個 row_offset 對應的數據變更。DeltaMemStore 數據增長到一定程度后轉化成二進制文件存儲到磁盤,形成一個 DeltaFile,隨着 base data 對應數據的不斷變更,DeltaFile 逐漸增長。

7.3 Tablet的發現過程

當創建Kudu客戶端時,其會從主服務器上獲取tablet位置信息,然后直接與服務於該tablet的服務器進行交談。

為了優化讀取和寫入路徑,客戶端將保留該信息的本地緩存,以防止他們在每個請求時需要查詢主機的tablet位置信息。

隨着時間的推移,客戶端的緩存可能會變得過時,並且當寫入被發送到不再是tablet領導者的tablet服務器時,則將被拒絕。然后客戶端將通過查詢主服務器發現新領導者的位置來更新其緩存。

   

7.4 kudu的寫流程

   

如上圖,當 Client 請求寫數據時,先根據主鍵從Master Server中獲取要訪問的目標 Tablets,然后到依次對應的Tablet獲取數據。

因為KUDU表存在主鍵約束,所以需要進行主鍵是否已經存在的判斷,這里就涉及到之前說的索引結構對讀寫的優化了。一個Tablet中存在很多個RowSets,為了提升性能,我們要盡可能地減少要掃描的RowSets數量。

首先,我們先通過每個 RowSet 中記錄的主鍵的(最大最小)范圍,過濾掉一批不存在目標主鍵的RowSets,然后在根據RowSet中的布隆過濾器,過濾掉確定不存在目標主鍵的 RowSets,最后再通過RowSets中的 B-樹索引,精確定位目標主鍵是否存在。

如果主鍵已經存在,則報錯(主鍵重復),否則就進行寫數據(寫 MemRowSet)。

7.5 kudu的讀流程

   

如上圖,數據讀取過程大致如下:先根據要掃描數據的主鍵范圍,定位到目標的Tablets,然后讀取Tablets 中的RowSets。

在讀取每個RowSet時,先根據主鍵過濾要scan范圍,然后加載范圍內的base data,再找到對應的delta stores,應用所有變更,最后union上MemRowSet中的內容,返回數據給Client。

7.6 kudu的更新流程

   

數據更新的核心是定位到待更新數據的位置,這塊與寫入的時候類似,就不展開了,等定位到具體位置后,然后將變更寫到對應的delta store 中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM