Kudu支持許多DML類型的操作,其中一些操作包含在Spark on Kudu集成
包括:
-
INSERT - 將DataFrame的行插入Kudu表。請注意,雖然API完全支持INSERT,但不鼓勵在Spark中使用它。使用INSERT是有風險的,因為Spark任務可能需要重新執行,這意味着可能要求再次插入已插入的行。這樣做會導致失敗,因為如果行已經存在,INSERT將不允許插入行(導致失敗)。相反,我們鼓勵使用下面描述的INSERT_IGNORE。
-
INSERT-IGNORE - 將DataFrame的行插入Kudu表。如果表存在,則忽略插入動作。
-
DELETE - 從Kudu表中刪除DataFrame中的行
-
UPSERT - 如果存在,則在Kudu表中更新DataFrame中的行,否則執行插入操作。
-
UPDATE - 更新dataframe中的行
Insert操作
import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.apache.kudu.spark.kudu._ /** * Created by angel; */ object Insert { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark創建kudu表 val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051" val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext) //TODO 1:定義kudu表 val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu參數 val kuduOptions: Map[String, String] = Map( "kudu.table" -> kuduTableName, "kudu.master" -> kuduMasters) import sqlContext.implicits._ //TODO 3:定義數據 val customers = Array( Customer("jane", 30, "new york"), Customer("jordan", 18, "toronto")) //TODO 4:創建RDD val customersRDD = sparkContext.parallelize(customers) //TODO 5:將RDD轉成dataFrame val customersDF = customersRDD.toDF() //TODO 6:將數據插入kudu表 kuduContext.insertRows(customersDF, kuduTableName) //TODO 7:將插入的數據讀取出來 sqlContext.read.options(kuduOptions).kudu.show } }
Delete操作
import org.apache.kudu.spark.kudu._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession /** * Created by angel; */ object Delete { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark創建kudu表 val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051" val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext) //TODO 1:定義kudu表 val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu參數 val kuduOptions: Map[String, String] = Map( "kudu.table" -> kuduTableName, "kudu.master" -> kuduMasters) import sqlContext.implicits._ //TODO 3:定義數據 val customers = Array( Customer("jane", 30, "new york"), Customer("jordan", 18, "toronto")) //TODO 4:創建RDD val customersRDD = sparkContext.parallelize(customers) //TODO 5:將RDD轉成dataFrame val customersDF = customersRDD.toDF() //TODO 6:注冊表 customersDF.registerTempTable("customers") //TODO 7:編寫SQL語句,過濾出想要的數據 val deleteKeysDF = sqlContext.sql("select name from customers where age > 20") //TODO 8:使用kuduContext執行刪除操作 kuduContext.deleteRows(deleteKeysDF, kuduTableName) //TODO 9:查看kudu表中的數據 sqlContext.read.options(kuduOptions).kudu.show } }
import org.apache.kudu.spark.kudu._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession /** * Created by angel; */ object Upsert { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark創建kudu表 val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051" val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext) //TODO 1:定義kudu表 val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu參數 val kuduOptions: Map[String, String] = Map( "kudu.table" -> kuduTableName, "kudu.master" -> kuduMasters) import sqlContext.implicits._ //TODO 3:定義數據集 val newAndChangedCustomers = Array( Customer("michael", 25, "chicago"), Customer("denise" , 43, "winnipeg"), Customer("jordan" , 19, "toronto")) //TODO 4:將數據集轉換成dataframe val newAndChangedRDD = sparkContext.parallelize(newAndChangedCustomers) val newAndChangedDF = newAndChangedRDD.toDF() //TODO 5:使用upsert來更新數據集 kuduContext.upsertRows(newAndChangedDF, kuduTableName) //TODO 6:讀取kudu中的數據 sqlContext.read.options(kuduOptions).kudu.show } }
import org.apache.kudu.spark.kudu._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession /** * Created by angel; */ object Update { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("AcctfileProcess") //設置Master_IP並設置spark參數 .setMaster("local") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = SparkContext.getOrCreate(sparkConf) val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext //使用spark創建kudu表 val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051" val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext) //TODO 1:定義kudu表 val kuduTableName = "spark_kudu_tbl" //TODO 2:配置kudu參數 val kuduOptions: Map[String, String] = Map( "kudu.table" -> kuduTableName, "kudu.master" -> kuduMasters) //TODO 3:准備數據集 val modifiedCustomers = Array(Customer("michael", 25, "toronto")) val modifiedCustomersRDD = sparkContext.parallelize(modifiedCustomers) //TODO 4:將數據集轉化成dataframe import sqlContext.implicits._ val modifiedCustomersDF = modifiedCustomersRDD.toDF() //TODO 5:執行更新操作 kuduContext.updateRows(modifiedCustomersDF, kuduTableName) //TODO 6:查看kudu數據 sqlContext.read.options(kuduOptions).kudu.show } }