一、Streaming與Flume的聯調
Spark 2.2.0 對應於 Flume 1.6.0
兩種模式:
1. Flume-style push-based approach:
Flume推送數據給Streaming
Streaming的receiver作為Flume的Avro agent
Spark workers應該跑在Flume這台機器上
Streaming先啟動,receiver監聽Flume push data的端口
實現:
寫flume配置文件:
netcat source -> memory channel -> avro sink
IDEA開發:
添加Spark-flume依賴
對應的API是FlumeUtils
開發代碼:
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * Spark Streaming整合Flume的第一種方式 * */ object FlumePushWordCount { def main(args: Array[String]): Unit = { //外部傳入參數 if (args.length != 2) { System.out.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array(hostname, port) = args //外部args數組 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //選擇輸入ssc的createStream方法,生成一個InputDStream val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt) //由於flume的內容有head有body, 需要先把內容拿出來, 並去掉空值 flumeStream.map(x => new String(x.event.getBody.array()).trim) .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
注意:為了不hard-core,選擇外部傳入hostname和port
在IDEA測試時,可以在

里面的program argument輸入運行參數
在本地測試時:
先啟動Streaming作業,然后啟動flume agent,最后通過telnet輸入數據,觀察IDEA的控制台輸出
在服務器測試時:
submit時一定要把maven依賴中在--packages加上,自動會在網絡上下載依賴
當不能下載時,需要--jars才能把預先下載好的jar包加上
2. Pull-based approach using a custom sink:
Streaming拉數據
Flume推送的數據先放到sink緩沖區
Streaming使用一個
reliable flume receiver,確保了數據的接收和備份
可靠性更高,支持容錯,生產上面常用
一台機器運行Flume agent,Spark集群其他機器可訪問這台機器的custom sink
實現:
Flume配置:
使用相關jars包,配置依賴:(參考Spark官網)
sink是一個獨特的type
IDEA開發:
對應上面Flume的依賴,使用的是
createPollStream,區別於第一種模式
其他地方都一樣,體現了Spark代碼的復用性
本地測試:
先啟動flume!!后啟動Streaming作業
二、Streaming與Kafka的聯調
Spark2.2.0對應於Kafka 0.8.2.1或更新(本次使用的是0.9.0.0)
兩種模式:
1. Receiver-based approach
使用Kafka高級用戶API
為了確保零數據丟失,需要用到
Write Ahead Logs(出現於Spark 1.2)
同步地保存接收到的數據到日志當中,出錯時可以恢復(容錯機制)
這是傳統的方式,在ZK server中消費數據
用KafkaUtils和Streaming對接,一樣需要加入kafka的各種依賴(見官網)
使用的API是createStream
注意:
- 此處的topic分區和RDD的分區不同概念
- 多個Kafka DStream可以並行接收
- 用write ahead logs時需要配置StorageLevel.MEMORY_AND_DISK_SER
准備工作:
啟動ZK server
啟動kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
創建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
測試topic能否正確生產和消費
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
IDEA代碼:
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming對接Kafka其中的Receiver-based方式 * */ object KafkaReceiverWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createStream需要傳入的其中一個參數是一個Map,就是topics對應的線程數 val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap) //一定要取Stream的第二位才是數據,可以print出來看看,在實際生產中只是更改這一行的業務邏輯!!! message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
本地測試/服務器測試:
從IDEA中輸入參數,即可看到結果
從服務器測試也是打包submit就行,看web UI的時候留意驗證receiver是占有一個Job的,證實了前面的理論
2. Direct Approach
No receiver!!!
從
Spark 1.3 版本開始有
沒有了Receiver,而是
周期性地檢測Kafka的offset,用了kafka simple consumer API
優點:
- 簡化了並行度,不需要創建多個input stream
- 性能更好,達到零數據丟失,且不需要保存副本於write ahead logs中
- 一次語義Exactly-once semantics
缺點:不能在zookeeper中更新offset,但可以自己設置讓其更新
使用的API是createDirectStream
准備工作和上面一樣。
IDEA代碼:
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * SparkStreaming對接Kafka其中的Direct方式 * */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { if (args.length != 4) { System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>") System.exit(1) } val Array(brokers, topics) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //createDirectStream需要傳入kafkaParams和topicsSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val topicsSet = topics.split(",").toSet val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet ) //一定要取Stream的第二位才是數據,可以print出來看看 message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
注意:StringDecoder有可能因為前面寫Kafka java API時的包沖突而導入失敗
在IDEA運行時報錯:
這是由於之前在Kafka基礎學習中我設置的kafka的依賴是0.9.0.0,和我們IDEA沖突,所以要把這一個依賴注釋掉才能執行
調優時就是配置createDirectStream的參數嘛!!
三、Flume + Kafka + Spark Streaming常用流處理架構
實現的需求:實時(到現在為止)的日志訪問統計操作
由於本人缺乏日志采集來源,故使用python語言來實現一個
日志生成器,模擬生產環境中服務器不斷生成日志的過程
本生成器產生的日志內容包括ip、time、url、status、referer
根據前面的知識,我們在實現的過程中有以下步驟:
1. Flume的選型,在本例中設為exec-memory-kafka
2. 打開kafka一個消費者,再啟動flume讀取日志生成器中的log文件,可看到kafka中成功讀取到日志產生器的實時數據
3. 讓Kafka接收到的數據傳輸到Spark Streaming當中,這樣就可以在Spark對實時接收到的數據進行操作了
由於與前面一、二的操作基本一致,此處不再重復列出詳細操作過程
下面直接進入Spark中對實時數據的操作:
分為數據清洗過程、統計功能實現過程兩個步驟!其中統計功能的實現基本上和Spark SQL中的操作一致,這又體現了Spark的代碼復用性,即能通用於多個框架中