spark操作kudu之DML操作


Kudu支持許多DML類型的操作,其中一些操作包含在Spark on Kudu集成

包括:

  • INSERT - 將DataFrame的行插入Kudu表。請注意,雖然API完全支持INSERT,但不鼓勵在Spark中使用它。使用INSERT是有風險的,因為Spark任務可能需要重新執行,這意味着可能要求再次插入已插入的行。這樣做會導致失敗,因為如果行已經存在,INSERT將不允許插入行(導致失敗)。相反,我們鼓勵使用下面描述的INSERT_IGNORE。

  • INSERT-IGNORE - 將DataFrame的行插入Kudu表。如果表存在,則忽略插入動作。

  • DELETE - 從Kudu表中刪除DataFrame中的行

  • UPSERT - 如果存在,則在Kudu表中更新DataFrame中的行,否則執行插入操作。

  • UPDATE - 更新dataframe中的行

Insert操作

import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.kudu.spark.kudu._
/**
  * Created by angel;
  */
object Insert {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("AcctfileProcess")
      //設置Master_IP並設置spark參數
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculationfalse", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = SparkContext.getOrCreate(sparkConf)
    val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
    //使用spark創建kudu表
    val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
    val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
    //TODO 1:定義kudu表
    val kuduTableName = "spark_kudu_tbl"

    //TODO 2:配置kudu參數
    val kuduOptions: Map[String, String] = Map(
      "kudu.table"  -> kuduTableName,
      "kudu.master" -> kuduMasters)
    import sqlContext.implicits._
    //TODO 3:定義數據
    val customers = Array(
      Customer("jane", 30, "new york"),
      Customer("jordan", 18, "toronto"))

    //TODO 4:創建RDD
    val customersRDD = sparkContext.parallelize(customers)
    //TODO 5:將RDD轉成dataFrame
    val customersDF = customersRDD.toDF()

    //TODO 6:將數據插入kudu表
    kuduContext.insertRows(customersDF, kuduTableName)

    //TODO 7:將插入的數據讀取出來
    sqlContext.read.options(kuduOptions).kudu.show
  }
}

Delete操作

import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

/**
  * Created by angel;
  */
object Delete {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("AcctfileProcess")
      //設置Master_IP並設置spark參數
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculationfalse", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = SparkContext.getOrCreate(sparkConf)
    val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
    //使用spark創建kudu表
    val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
    val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
    //TODO 1:定義kudu表
    val kuduTableName = "spark_kudu_tbl"

    //TODO 2:配置kudu參數
    val kuduOptions: Map[String, String] = Map(
      "kudu.table"  -> kuduTableName,
      "kudu.master" -> kuduMasters)
    import sqlContext.implicits._
    //TODO 3:定義數據
    val customers = Array(
      Customer("jane", 30, "new york"),
      Customer("jordan", 18, "toronto"))

    //TODO 4:創建RDD
    val customersRDD = sparkContext.parallelize(customers)
    //TODO 5:將RDD轉成dataFrame
    val customersDF = customersRDD.toDF()
    //TODO 6:注冊表
    customersDF.registerTempTable("customers")

    //TODO 7:編寫SQL語句,過濾出想要的數據
    val deleteKeysDF = sqlContext.sql("select name from customers where age > 20")

    //TODO 8:使用kuduContext執行刪除操作
    kuduContext.deleteRows(deleteKeysDF, kuduTableName)

    //TODO 9:查看kudu表中的數據
    sqlContext.read.options(kuduOptions).kudu.show
  }
}

Upsert操作

如果存在,則在Kudu表中更新DataFrame中的行,否則執行插入操作。

import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

/**
  * Created by angel;
  */
object Upsert {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("AcctfileProcess")
      //設置Master_IP並設置spark參數
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculationfalse", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = SparkContext.getOrCreate(sparkConf)
    val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
    //使用spark創建kudu表
    val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
    val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
    //TODO 1:定義kudu表
    val kuduTableName = "spark_kudu_tbl"

    //TODO 2:配置kudu參數
    val kuduOptions: Map[String, String] = Map(
      "kudu.table"  -> kuduTableName,
      "kudu.master" -> kuduMasters)
    import sqlContext.implicits._

    //TODO 3:定義數據集
    val newAndChangedCustomers = Array(
      Customer("michael", 25, "chicago"),
      Customer("denise" , 43, "winnipeg"),
      Customer("jordan" , 19, "toronto"))

    //TODO 4:將數據集轉換成dataframe
    val newAndChangedRDD = sparkContext.parallelize(newAndChangedCustomers)
    val newAndChangedDF  = newAndChangedRDD.toDF()

    //TODO 5:使用upsert來更新數據集
    kuduContext.upsertRows(newAndChangedDF, kuduTableName)

    //TODO 6:讀取kudu中的數據
    sqlContext.read.options(kuduOptions).kudu.show
  }
}

Update操作

更新kudu行數據

import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

/**
  * Created by angel;
  */
object Update {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("AcctfileProcess")
      //設置Master_IP並設置spark參數
      .setMaster("local")
      .set("spark.worker.timeout", "500")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "1")
      .set("spark.speculationfalse", "false")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkContext = SparkContext.getOrCreate(sparkConf)
    val sqlContext = SparkSession.builder().config(sparkConf).getOrCreate().sqlContext
    //使用spark創建kudu表
    val kuduMasters = "hadoop01:7051,hadoop02:7051,hadoop03:7051"
    val kuduContext = new KuduContext(kuduMasters, sqlContext.sparkContext)
    //TODO 1:定義kudu表
    val kuduTableName = "spark_kudu_tbl"

    //TODO 2:配置kudu參數
    val kuduOptions: Map[String, String] = Map(
      "kudu.table"  -> kuduTableName,
      "kudu.master" -> kuduMasters)

    //TODO 3:准備數據集
    val modifiedCustomers = Array(Customer("michael", 25, "toronto"))
    val modifiedCustomersRDD = sparkContext.parallelize(modifiedCustomers)
    //TODO 4:將數據集轉化成dataframe
    import sqlContext.implicits._
    val modifiedCustomersDF  = modifiedCustomersRDD.toDF()

    //TODO 5:執行更新操作
    kuduContext.updateRows(modifiedCustomersDF, kuduTableName)

    //TODO 6:查看kudu數據
    sqlContext.read.options(kuduOptions).kudu.show
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM