spark sql createOrReplaceTempView registerTempTable


createOrReplaceTempView2.x版本以上。
registerTempTable1.5.x
  val data1 = dataSelect1(sqlContext, sparkModel)
  val data2 = dataSelect2(sqlContext, sparkModel)
  data1.createOrReplaceTempView("new_table1_info")
  data2.createOrReplaceTempView("new_table2_info")
 
  val sql='select t1.*,t2.* from new_table1_info t1 left join new_table2_info t2 on t1.name=t2.name2' where t1.age>18 ;
  val result = sqlContext.sql(sql)
 
  def dataSelect1(sqlContext: HiveContext, sm: SparkModel): DataFrame = {
    val sql = "select name,age,other  from table1 where name !='' "
    sqlContext.sql(sql)
  }
  
 
  def dataSelect2(sqlContext: HiveContext, sm: SparkModel): DataFrame = {
    val sql = "select name2,age2,other2,hh  from table2 where name !='' "
    sqlContext.sql(sql)
  } 

如果一次处理后的数据太多,一下字发送kafka就会导致kafka对赛,那就然他睡几毫秒。不可可根据业务数据调试。每分钟100万左右就可以了吧。

result.foreach(x => {
      val json = new JSONObject(x)
      kafka.value.send(topic, json.toString)
      Thread.sleep(everynum.toInt)
    })

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM