項目背景:spark streaming消費kafka里的數據,但spark streaming接收的數據類型是Dstream,由於業務需要寫sql語句,需要將Dstream轉為Dateframe,這樣就可以寫業務的sql語句。
其核心代碼片段如下:
import com.****.common.PropertieUtil //自己寫的工具 import net.sf.json.JSONObject import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.{ArrayBuffer, Map, Set} object SparkStreaming03 { def main(args: Array[String]): Unit = { val brokers=PropertieUtil.getString("kafka.brokers") val topic=PropertieUtil.getString("kafka.topic") val tableNameHn=PropertieUtil.getString(("hbase.tablenamehn")) val tableNamesd=PropertieUtil.getString(("hbase.tablenamesd")) val hbaseZookeeperList=PropertieUtil.getString("hbase.zookeeper") val sparkconf=new SparkConf().setMaster("local[2]").setAppName("SparkStreaming") val ssc=new StreamingContext(sparkconf,Seconds(10)) val accumlators=ssc.sparkContext.accumulator(0) val topics=Set(topic) val kafkaParams=Map[String,Object]( "bootstrap.servers" ->brokers, "key.deserializer" ->classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "*g", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) var kafkaStream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topics,kafkaParams)) var events=kafkaStream.map(record=>record.value()) events.foreachRDD(rdd=>{ val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ rdd.map(pair=>(dataArray(pair))).map(v=>(v(0),v(1),v(2))).toDF(colNames = "PROID","KEY","VALUE").registerTempTable("speedtable") val dataframe=sqlContext.sql("select * from speedtable ") dataframe.show() println(dataframe.count()) }) ssc.start() ssc.awaitTermination() } def dataArray(pair:String):ArrayBuffer[String]={ val array = ArrayBuffer[String]() val JsonObject=JSONObject.fromObject(pair) val pro=JsonObject.get("A").toString array.append(pro) val key=JsonObject.get("B").toString array.append(key) val data=JsonObject.get("C").toString array.append(data) array } }
2.工具對象
import com.typesafe.config.{Config, ConfigFactory} object PropertieUtil extends java.io.Serializable{ val config:Config=ConfigFactory.load() def getString(path:String):String={ return config.getString(path) } def getInteger(path:String):Integer={ return config.getString(path).toInt } def getDouble(path:String):Double={ return config.getString(path).toDouble } def main(args: Array[String]): Unit = { val key="18082817000000369" print(key.substring(0,6).reverse+key.substring(6,key.length)) } }
3.配置文件
mysql { url : "jdbc:mysql://192.168.76.14:3306/ibigdata?useUnicode=true&characterEncoding=UTF-8", dbtable: "record_test", user: "root", password:"root" } kafka{ brokers:"192.168.113.11:9092,192.168.*.12:9092,192.168.*.13:9092,192.168.*.14:9092,192.168.*.15:9092" // brokers:"192.168.58.55:9092,192.168.58.56:9092,192.168.58.57:9092" // topic :"keliu_test" topic :"*g" } hbase{ zookeeper:"192.168.*.11:2181,192.168.*.12:2181,192.168.*.13:2181,192.168.*.14:2181,192.168.*.15:2181" tablenamehn:"*03" tablenamesd:"*SITE_SD" }
上面的數據格式都是這種類型的
{"LINENO":"378","ISUPDOWN":"1","STATIONID":"27","UPPASSENGER":"6","DOWNPASSENGER":"17","INSTINE":"2018-09-11 12:03:01"}
如果是下面這種數據結構的
{"V":"V7","Date":{"LINENO":"378","ISUPDOWN":"1","STATIONID":"27","UPPASSENGER":"6","DOWNPASSENGER":"17","INSTINE":"2018-09-11 12:03:01"}}
只需要在spark streaming消費代碼加這個即可(下面的代碼片段是和上面的一樣,就是添加獲取json對象)
def dataArray(pair:String):ArrayBuffer[String]={ val array = ArrayBuffer[String]() val JsonObject=JSONObject.fromObject(pair) val pro=JsonObject.get("V").toString //array.append(pro) //構造 val Date= JsonObject.getJSONObject("Date") val a=Date.get("LINENO") val b=Date.get("ISUPDOWN") val c=Date.get("STATIONID") val d=Date.get("UPPASSENGER") val e=Date.get("DOWNPASSENGER") val f=Date.get("INSTINE") array.append(a.toString) array.append(b.toString) array.append(c.toString) array.append(d.toString) array.append(e.toString) array.append(f.toString) array