spark對kudu表的創建
定義kudu的表需要分成5個步驟:
1:提供表名
2:提供schema
3:提供主鍵
4:定義重要選項;例如:定義分區的schema
5:調用create Table api
import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.spark.kudu._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import collection.JavaConverters._ /** * Created by angel; */ object CURD { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark創建kudu表 val kuduContext = new KuduContext("hadoop01:7051,hadoop02:7051,hadoop03:7051", sqlContext.sparkContext) //TODO 1:定義表名 val kuduTableName = "spark_kudu_tbl" //TODO 2:定義schema val schema = StructType( StructField("CompanyId", StringType, false) :: StructField("name", StringType, false) :: StructField("sex", StringType, true) :: StructField("age", IntegerType, true) :: Nil ) ////TODO 3:定義表的主鍵 val kuduTablePrimaryKey = Seq("CompanyId") //TODO 4:定義分區的schema val kuduTableOptions = new CreateTableOptions() kuduTableOptions. setRangePartitionColumns(List("name").asJava). setNumReplicas(3) //TODO 5:調用create Table api kuduContext.createTable( kuduTableName,schema,kuduTablePrimaryKey, kuduTableOptions) } }
定義表時要注意的一個項目是Kudu表選項值。您會注意到在指定組成范圍分區列的列名列表時我們調用“asJava”方法。這是因為在這里,我們調用了Kudu Java客戶端本身,它需要Java對象(即java.util.List)而不是Scala的List對象;(要使“asJava”方法可用,請記住導入JavaConverters庫。)
創建表后,通過將瀏覽器指向http:// <master-hostname>:8051 / tables來查看Kudu主UI可以找到創建的表,通過單擊表ID,能夠看到表模式和分區信息。
(點擊Table id 可以觀察到表的schema等信息)
spark刪除kudu表
object DropTable { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark創建kudu表 val kuduContext = new KuduContext("hadoop01:7051,hadoop02:7051,hadoop03:7051", sqlContext.sparkContext) // TODO 指定要刪除的表名稱 var kuduTableName = "spark_kudu_tbl" // TODO 檢查表如果存在,那么刪除表 if (kuduContext.tableExists(kuduTableName)) { kuduContext.deleteTable(kuduTableName) } } }