基於Kafka+Spark Streaming+HBase實時點擊流案例


 背景

Kafka實時記錄從數據采集工具Flume或業務系統實時接口收集數據,並作為消息緩沖組件為上游實時計算框架提供可靠數據支撐,Spark 1.3版本后支持兩種整合Kafka機制(Receiver-based Approach 和 Direct Approach),具體細節請參考文章最后官方文檔鏈接,數據存儲使用HBase

實現思路

  1. 實現Kafka消息生產者模擬器
  2. Spark Streaming采用Direct Approach方式實時獲取Kafka中數據
  3. Spark Streaming對數據進行業務計算后存儲到HBase

組件版本

Spark 2.1.0  Kafka0.9.0.1 HBase1.2.0

代碼實現

Kafka消息模擬器

object KafkaMessageGenerator { private val random = new Random() private var pointer = -1

  private val os_type = Array( "Android", "IPhone OS", "None", "Windows Phone" ) def click(): Double = { random.nextInt(10) } def getOsType(): String = { pointer = pointer + 1
    if (pointer >= os_type.length) { pointer = 0 os_type(pointer) } else { os_type(pointer) } } def main(args: Array[String]): Unit = { val topic = "user_events" val props = new Properties() props.put("bootstrap.servers", "10.3.71.154:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) while (true) { val event: JSONObject = new JSONObject() event.put("uid", UUID.randomUUID()) //隨機生成用戶id
        event.put("event_time", System.currentTimeMillis.toString) //記錄事件發生時間
        event.put("os_type", getOsType) //設備類型
        event.put("click_count", click) //點擊次數
        val record = new ProducerRecord[String, String](topic, event.toString) producer.send(record) println("Message sent: " + event) Thread.sleep(200) } } }

Spark Streaming主類

object PageViewStream { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("PageViewStream").setMaster("local[*]") //創建StreamingContext 批處理間隔5s
    val ssc = new StreamingContext(conf, Seconds(5)) // kafka配置
    val kafkaParams = Map[String, String]( "metadata.broker.list" -> "10.3.71.154:9092", "serializer.class" -> "kafka.serializer.StringEncoder" ) //創建一個direct stream
    val kafkaStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("user_events")) val events: DStream[JSONObject] = kafkaStream.flatMap(line => { val data: JSONObject = JSON.parseObject(line._2) Some(data) }) // 計算用戶點擊次數
    val userClicks: DStream[(String, Integer)] = events.map(x => (x.getString("uid"), x.getInteger("click_count"))).reduceByKey(_ + _) userClicks.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { //Hbase配置
        val tableName = "PageViewStream2" val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "master66") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") val conn = ConnectionFactory.createConnection(hbaseConf) val StatTable = conn.getTable(TableName.valueOf(tableName)) partitionOfRecords.foreach(pair => { //用戶ID
          val uid = pair._1 //點擊次數
          val click = pair._2 //組裝數據 創建put對象 rowkey
          val put = new Put(Bytes.toBytes(uid)) put.addColumn("Stat2".getBytes, "ClickStat".getBytes, Bytes.toBytes("TESTS============")) StatTable.put(put) }) }) }) ssc.start() ssc.awaitTermination() } }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM