spark streaming類型轉換為Dateframe


項目背景:spark streaming消費kafka里的數據,但spark streaming接收的數據類型是Dstream,由於業務需要寫sql語句,需要將Dstream轉為Dateframe,這樣就可以寫業務的sql語句。

其核心代碼片段如下:

import com.****.common.PropertieUtil //自己寫的工具
import net.sf.json.JSONObject
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.{ArrayBuffer, Map, Set}

object SparkStreaming03 {
  def main(args: Array[String]): Unit = {
    val brokers=PropertieUtil.getString("kafka.brokers")
    val topic=PropertieUtil.getString("kafka.topic")
    val tableNameHn=PropertieUtil.getString(("hbase.tablenamehn"))
    val tableNamesd=PropertieUtil.getString(("hbase.tablenamesd"))
    val hbaseZookeeperList=PropertieUtil.getString("hbase.zookeeper")

    val sparkconf=new SparkConf().setMaster("local[2]").setAppName("SparkStreaming")

    val ssc=new StreamingContext(sparkconf,Seconds(10))
    val accumlators=ssc.sparkContext.accumulator(0)
    val topics=Set(topic)
    val kafkaParams=Map[String,Object](
      "bootstrap.servers" ->brokers,
      "key.deserializer" ->classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "*g",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    var kafkaStream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topics,kafkaParams))
    var events=kafkaStream.map(record=>record.value())
    events.foreachRDD(rdd=>{
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      rdd.map(pair=>(dataArray(pair))).map(v=>(v(0),v(1),v(2))).toDF(colNames = "PROID","KEY","VALUE").registerTempTable("speedtable")
      val dataframe=sqlContext.sql("select * from  speedtable ")
      dataframe.show()
      println(dataframe.count())
    })

    ssc.start()
    ssc.awaitTermination()
  }
  def dataArray(pair:String):ArrayBuffer[String]={
    val array = ArrayBuffer[String]()
    val JsonObject=JSONObject.fromObject(pair)
    val pro=JsonObject.get("A").toString
    array.append(pro)
    val key=JsonObject.get("B").toString
    array.append(key)
    val data=JsonObject.get("C").toString
    array.append(data)
    array
  }
}

2.工具對象

import com.typesafe.config.{Config, ConfigFactory}

object PropertieUtil extends java.io.Serializable{
  val config:Config=ConfigFactory.load()
  def getString(path:String):String={
    return config.getString(path)
  }
  def getInteger(path:String):Integer={
    return config.getString(path).toInt
  }
  def getDouble(path:String):Double={
    return config.getString(path).toDouble
  }

  def main(args: Array[String]): Unit = {
    val key="18082817000000369"
    print(key.substring(0,6).reverse+key.substring(6,key.length))
  }
}

3.配置文件

mysql {
  url : "jdbc:mysql://192.168.76.14:3306/ibigdata?useUnicode=true&characterEncoding=UTF-8",
  dbtable: "record_test",
  user: "root",
  password:"root"
}
kafka{
  brokers:"192.168.113.11:9092,192.168.*.12:9092,192.168.*.13:9092,192.168.*.14:9092,192.168.*.15:9092"
//  brokers:"192.168.58.55:9092,192.168.58.56:9092,192.168.58.57:9092"
//  topic :"keliu_test"
  topic :"*g"

}
hbase{
  zookeeper:"192.168.*.11:2181,192.168.*.12:2181,192.168.*.13:2181,192.168.*.14:2181,192.168.*.15:2181"
  tablenamehn:"*03"
  tablenamesd:"*SITE_SD"
}

 上面的數據格式都是這種類型的

{"LINENO":"378","ISUPDOWN":"1","STATIONID":"27","UPPASSENGER":"6","DOWNPASSENGER":"17","INSTINE":"2018-09-11 12:03:01"}

如果是下面這種數據結構的

{"V":"V7","Date":{"LINENO":"378","ISUPDOWN":"1","STATIONID":"27","UPPASSENGER":"6","DOWNPASSENGER":"17","INSTINE":"2018-09-11 12:03:01"}}

只需要在spark streaming消費代碼加這個即可(下面的代碼片段是和上面的一樣,就是添加獲取json對象)

 def dataArray(pair:String):ArrayBuffer[String]={
    val array = ArrayBuffer[String]()
    val JsonObject=JSONObject.fromObject(pair)
    val pro=JsonObject.get("V").toString
    //array.append(pro)
    //構造
    val Date= JsonObject.getJSONObject("Date")
    val a=Date.get("LINENO")
    val b=Date.get("ISUPDOWN")
    val c=Date.get("STATIONID")
    val d=Date.get("UPPASSENGER")
    val e=Date.get("DOWNPASSENGER")
    val f=Date.get("INSTINE")
    array.append(a.toString)
    array.append(b.toString)
    array.append(c.toString)
    array.append(d.toString)
    array.append(e.toString)
    array.append(f.toString)
    array

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM