spark sql createOrReplaceTempView registerTempTable


createOrReplaceTempView2.x版本以上。
registerTempTable1.5.x
  val data1 = dataSelect1(sqlContext, sparkModel)
  val data2 = dataSelect2(sqlContext, sparkModel)
  data1.createOrReplaceTempView("new_table1_info")
  data2.createOrReplaceTempView("new_table2_info")
 
  val sql='select t1.*,t2.* from new_table1_info t1 left join new_table2_info t2 on t1.name=t2.name2' where t1.age>18 ;
  val result = sqlContext.sql(sql)
 
  def dataSelect1(sqlContext: HiveContext, sm: SparkModel): DataFrame = {
    val sql = "select name,age,other  from table1 where name !='' "
    sqlContext.sql(sql)
  }
  
 
  def dataSelect2(sqlContext: HiveContext, sm: SparkModel): DataFrame = {
    val sql = "select name2,age2,other2,hh  from table2 where name !='' "
    sqlContext.sql(sql)
  } 

如果一次處理后的數據太多,一下字發送kafka就會導致kafka對賽,那就然他睡幾毫秒。不可可根據業務數據調試。每分鍾100萬左右就可以了吧。

result.foreach(x => {
      val json = new JSONObject(x)
      kafka.value.send(topic, json.toString)
      Thread.sleep(everynum.toInt)
    })

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM