Spark Streaming實現實時流處理


一、Streaming與Flume的聯調

Spark 2.2.0 對應於 Flume 1.6.0
 
兩種模式:
 
1. Flume-style push-based approach:
 
Flume推送數據給Streaming
 
Streaming的receiver作為Flume的Avro agent
 
Spark workers應該跑在Flume這台機器上
 
Streaming先啟動,receiver監聽Flume push data的端口
 
 
實現:
 
寫flume配置文件:
netcat source -> memory channel -> avro sink
 
IDEA開發:
添加Spark-flume依賴
對應的API是FlumeUtils
 
開發代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming整合Flume的第一種方式
* */
object FlumePushWordCount {
  def main(args: Array[String]): Unit = {
 
    //外部傳入參數
    if (args.length != 2) {
      System.out.println("Usage: FlumePushWordCount <hostname> <port>")
      System.exit(1)
    }
 
    val Array(hostname, port) = args  //外部args數組
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //選擇輸入ssc的createStream方法,生成一個InputDStream
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
 
    //由於flume的內容有head有body, 需要先把內容拿出來, 並去掉空值
    flumeStream.map(x => new String(x.event.getBody.array()).trim)
        .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
注意:為了不hard-core,選擇外部傳入hostname和port
 
在IDEA測試時,可以在
里面的program argument輸入運行參數
 
在本地測試時:
 
先啟動Streaming作業,然后啟動flume agent,最后通過telnet輸入數據,觀察IDEA的控制台輸出
 
 
在服務器測試時:
 
submit時一定要把maven依賴中在--packages加上,自動會在網絡上下載依賴
當不能下載時,需要--jars才能把預先下載好的jar包加上
 
 
 
2. Pull-based approach using a custom sink:
 
Streaming拉數據
 
Flume推送的數據先放到sink緩沖區
 
Streaming使用一個 reliable flume receiver,確保了數據的接收和備份
 
可靠性更高,支持容錯,生產上面常用
 
一台機器運行Flume agent,Spark集群其他機器可訪問這台機器的custom sink
 
實現:
 
Flume配置:
使用相關jars包,配置依賴:(參考Spark官網)
sink是一個獨特的type
 
IDEA開發:
對應上面Flume的依賴,使用的是 createPollStream,區別於第一種模式
其他地方都一樣,體現了Spark代碼的復用性
 
本地測試:
先啟動flume!!后啟動Streaming作業
 
 
 
二、Streaming與Kafka的聯調
 
Spark2.2.0對應於Kafka 0.8.2.1或更新(本次使用的是0.9.0.0)
 
兩種模式:
 
1. Receiver-based approach
 
使用Kafka高級用戶API
為了確保零數據丟失,需要用到 Write Ahead Logs(出現於Spark 1.2)
同步地保存接收到的數據到日志當中,出錯時可以恢復(容錯機制)
這是傳統的方式,在ZK server中消費數據
 
用KafkaUtils和Streaming對接,一樣需要加入kafka的各種依賴(見官網)
使用的API是createStream
 
注意:
  1. 此處的topic分區和RDD的分區不同概念
  2. 多個Kafka DStream可以並行接收
  3. 用write ahead logs時需要配置StorageLevel.MEMORY_AND_DISK_SER
 
 
准備工作:
 
啟動ZK server
啟動kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
創建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
測試topic能否正確生產和消費
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
 
IDEA代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* SparkStreaming對接Kafka其中的Receiver-based方式
* */
object KafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
 
    if (args.length != 4) {
      System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
 
    val Array(zkQuorum, group, topics, numThreads) = args
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //createStream需要傳入的其中一個參數是一個Map,就是topics對應的線程數
    val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap
 
    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap)
 
    //一定要取Stream的第二位才是數據,可以print出來看看,在實際生產中只是更改這一行的業務邏輯!!!
    message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
本地測試/服務器測試:
 
從IDEA中輸入參數,即可看到結果
 
從服務器測試也是打包submit就行,看web UI的時候留意驗證receiver是占有一個Job的,證實了前面的理論
 
 
 
2. Direct Approach
 
No receiver!!!
 
Spark 1.3 版本開始有
 
沒有了Receiver,而是 周期性地檢測Kafka的offset,用了kafka simple consumer API
 
優點:
  1. 簡化了並行度,不需要創建多個input stream
  2. 性能更好,達到零數據丟失,且不需要保存副本於write ahead logs中
  3.  一次語義Exactly-once semantics
 
缺點:不能在zookeeper中更新offset,但可以自己設置讓其更新
 
 使用的API是createDirectStream
 
准備工作和上面一樣。
 
IDEA代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
 
/*
* SparkStreaming對接Kafka其中的Direct方式
* */
object KafkaDirectWordCount {
  def main(args: Array[String]): Unit = {
 
    if (args.length != 4) {
      System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>")
      System.exit(1)
    }
 
    val Array(brokers, topics) = args
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //createDirectStream需要傳入kafkaParams和topicsSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsSet = topics.split(",").toSet
 
    val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet
    )
 
 
    //一定要取Stream的第二位才是數據,可以print出來看看
    message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
注意:StringDecoder有可能因為前面寫Kafka java API時的包沖突而導入失敗
 
在IDEA運行時報錯:
這是由於之前在Kafka基礎學習中我設置的kafka的依賴是0.9.0.0,和我們IDEA沖突,所以要把這一個依賴注釋掉才能執行
 
調優時就是配置createDirectStream的參數嘛!!
 
 
 
三、Flume + Kafka + Spark Streaming常用流處理架構
 
實現的需求:實時(到現在為止)的日志訪問統計操作
 
由於本人缺乏日志采集來源,故使用python語言來實現一個 日志生成器,模擬生產環境中服務器不斷生成日志的過程
本生成器產生的日志內容包括ip、time、url、status、referer
 
根據前面的知識,我們在實現的過程中有以下步驟:
1. Flume的選型,在本例中設為exec-memory-kafka
2. 打開kafka一個消費者,再啟動flume讀取日志生成器中的log文件,可看到kafka中成功讀取到日志產生器的實時數據
3. 讓Kafka接收到的數據傳輸到Spark Streaming當中,這樣就可以在Spark對實時接收到的數據進行操作了
 
由於與前面一、二的操作基本一致,此處不再重復列出詳細操作過程
 
 
下面直接進入Spark中對實時數據的操作:
 
分為數據清洗過程、統計功能實現過程兩個步驟!其中統計功能的實現基本上和Spark SQL中的操作一致,這又體現了Spark的代碼復用性,即能通用於多個框架中
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM