使用spark集成kudu做DDL


spark對kudu表的創建

定義kudu的表需要分成5個步驟:

1:提供表名

2:提供schema

3:提供主鍵

4:定義重要選項;例如:定義分區的schema

5:調用create Table api

import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import collection.JavaConverters._
/**
  * Created by angel;
  */
object CURD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("AcctfileProcess")
      //設置Master_IP並設置spark參數
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculationfalse", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = SparkContext.getOrCreate(sparkConf)
    val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
    //使用spark創建kudu表
    val kuduContext = new KuduContext("hadoop01:7051,hadoop02:7051,hadoop03:7051", sqlContext.sparkContext)
    //TODO 1:定義表名
    val kuduTableName = "spark_kudu_tbl"
    //TODO 2:定義schema
    val schema = StructType(
        StructField("CompanyId", StringType, false) ::
        StructField("name", StringType, false) ::
        StructField("sex", StringType, true) ::
        StructField("age", IntegerType, true) :: Nil
    )
    ////TODO 3:定義表的主鍵
    val kuduTablePrimaryKey = Seq("CompanyId")
    //TODO 4:定義分區的schema
    val kuduTableOptions = new CreateTableOptions()
    kuduTableOptions.
      setRangePartitionColumns(List("name").asJava).
      setNumReplicas(3)
    //TODO 5:調用create Table api
    kuduContext.createTable(
      kuduTableName,schema,kuduTablePrimaryKey, kuduTableOptions)
  }
}

定義表時要注意的一個項目是Kudu表選項值。您會注意到在指定組成范圍分區列的列名列表時我們調用“asJava”方法。這是因為在這里,我們調用了Kudu Java客戶端本身,它需要Java對象(即java.util.List)而不是Scala的List對象;(要使“asJava”方法可用,請記住導入JavaConverters庫。)

創建表后,通過將瀏覽器指向http:// <master-hostname>:8051 / tables來查看Kudu主UI可以找到創建的表,通過單擊表ID,能夠看到表模式和分區信息。

(點擊Table id 可以觀察到表的schema等信息)

spark刪除kudu表

object DropTable {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("AcctfileProcess")
      //設置Master_IP並設置spark參數
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculationfalse", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = SparkContext.getOrCreate(sparkConf)
    val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
    //使用spark創建kudu表
    val kuduContext = new KuduContext("hadoop01:7051,hadoop02:7051,hadoop03:7051", sqlContext.sparkContext)

    // TODO 指定要刪除的表名稱
    var kuduTableName = "spark_kudu_tbl"

    // TODO 檢查表如果存在,那么刪除表
    if (kuduContext.tableExists(kuduTableName)) {
      kuduContext.deleteTable(kuduTableName)
    }
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM