Spark Streaming + Kafka集成指南
Kafka項目在版本0.8和0.10之間引入了一個新的消費者API,因此有兩個獨立的相應Spark Streaming包可用。請選擇正確的包, 請注意,0.8集成與后來的0.9和0.10代理兼容,但0.10集成與早期的代理不兼容。
注意:從Spark 2.3.0開始,不推薦使用Kafka 0.8支持。
Spark Streaming從Kafka接收數據,轉換為spark streaming中的數據結構Dstream。數據接收方式有兩種 :1 使用Receiver接收的舊方法:2使用Direct拉取的新方法(在Spark 1.3中引入)。
https://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html
https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html
Receiver方式
Received是使用Kafka高級Consumer API實現的。與所有接收器一樣,從Kafka通過Receiver接收的數據存儲在Spark Executor的內存中,然后由Spark Streaming啟動的job來處理數據。然而默認配置下,這種方式可能會因為底層的失敗而丟失數據(請參閱接收器可靠性)。如果要啟用高可靠機制,確保零數據丟失,要啟用Spark Streaming的預寫日志機制(Write Ahead Log,(已引入)在Spark 1.2)。該機制會同步地將接收到的Kafka數據保存到分布式文件系統(比如HDFS)上的預寫日志中,以便底層節點在發生故障時也可以使用預寫日志中的數據進行恢復。
如下圖:
接下來,我們將討論如何在流應用程序中使用此方法。
1 鏈接
對於使用Maven項目定義的Scala / Java應用程序時,我們需要添加相應的依賴包:
<dependency><!-- Spark Streaming Kafka --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId> <version>1.6.3</version> </dependency>
2 編程
在流應用程序代碼中,導入KafkaUtils
並創建輸入DStream,如下所示。
Scala編程:
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
Java編程
import org.apache.spark.streaming.kafka.*; JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
還有幾個需要注意的點:
- Kafka中topic的partition與Spark Streaming中生成的RDD的partition無關,因此,在KafkaUtils.createStream()中,增加某個topic的partition的數量,只會增加單個Receiver消費topic的線程數,也就是讀取Kafka中topic partition的線程數量,它不會增加Spark在處理數據時的並行性。
- 可以使用不同的consumer group和topic創建多個Kafka輸入DStream,以使用多個receiver並行接收數據。
- 如果已使用HDFS等復制文件系統啟用了“預讀日志”,則接收的數據已在日志中復制。因此,輸入流的存儲級別的存儲級別
StorageLevel.MEMORY_AND_DISK_SER
(即,使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
)。
3 部署
與任何Spark應用程序一樣,spark-submit
用於啟動應用程序。但是,Scala / Java應用程序和Python應用程序的細節略有不同。
對於Scala和Java應用程序,如果您使用SBT或Maven進行項目管理,則將spark-streaming-kafka_2.11
其及其依賴項打包到應用程序JAR中。確保spark-core_2.10
並spark-streaming_2.10
標記為provided
Spark安裝中已存在的依賴項。然后使用spark-submit
啟動應用程序。
對於缺少SBT / Maven項目管理的Python應用程序,spark-streaming-kafka_2.11
可以直接將其依賴項添加到spark-submit
使用中--packages
。那是,
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3 ...
另外,您也可以下載Maven構件的JAR spark-streaming-kafka-assembly
從 Maven倉庫,並將其添加到spark-submit
用--jars
。
Direct方式
在spark1.3之后,引入了Direct方式。不同於Receiver的方式,Direct方式沒有receiver這一層,其會周期性的獲取Kafka中每個topic的每個partition中的最新offsets,之后根據設定的maxRatePerPartition來處理每個batch。其形式如下圖:
這種方法相較於Receiver方式的優勢在於:
- 簡化的並行:在Receiver的方式中我們提到創建多個Receiver之后利用union來合並成一個Dstream的方式提高數據傳輸並行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的並行讀取Kafka數據,這種映射關系也更利於理解和優化。
- 高效:在Receiver的方式中,為了達到0數據丟失需要將數據存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數據,浪費!而第二種方式不存在這個問題,只要我們Kafka的數據保留時間足夠長,我們都能夠從Kafka進行數據恢復。
- 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但由於Spark Streaming消費的數據和Zookeeper中記錄的offset不同步,這種方式偶爾會造成數據重復消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一致性。
請注意,此方法的一個缺點是它不會更新Zookeeper中的偏移量,因此基於Zookeeper的Kafka監視工具將不會顯示進度。但是,您可以在每個批處理中訪問此方法處理的偏移量,並自行更新Zookeeper。
接下來,我們將討論如何在流應用程序中使用此方法。
1 鏈接
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.1</version> </dependency>
2 編程
請注意,導入的命名空間包括版本org.apache.spark.streaming.kafka010
Scala編程
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node21:9092,node22:9092,node23:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topicA", "topicB") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value))
流中的每個項目都是ConsumerRecord,有關可能的kafkaParams,請參閱Kafka使用者配置文檔。如果Spark批處理持續時間大於默認的Kafka心跳會話超時(30秒),請適當增加heartbeat.interval.ms和session.timeout.ms。對於大於5分鍾的批次,這將需要在代理上更改group.max.session.timeout.ms。請注意,該示例將enable.auto.commit設置為false,有關討論,請參閱存儲偏移。
3 Direct方式案例
package com.xyg.spark import kafka.serializer.{StringDecoder, Decoder} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} import scala.reflect.ClassTag /** * Created by Administrator on 2018/7/28. */ object SparkStreamDemo { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("spark_streaming") conf.setMaster("local[*]") val sc = new SparkContext(conf) sc.setCheckpointDir("D:/checkpoints") sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(5)) // val topics = Map("spark" -> 2) val kafkaParams = Map[String, String]( "bootstrap.servers" -> "node21:9092,node22:9092,node23:9092", "group.id" -> "spark", "auto.offset.reset" -> "smallest" ) // 直連方式拉取數據,這種方式不會修改數據的偏移量,需要手動的更新 val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("spark")).map(_._2) // val lines = KafkaUtils.createStream(ssc, "node21:2181,node22:2181,node23:2181", "spark", topics).map(_._2) val ds1 = lines.flatMap(_.split(" ")).map((_, 1)) val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => { Some(x.sum + y.getOrElse(0)) }) ds2.print() ssc.start() ssc.awaitTermination() } }
Spark向kafka中寫入數據
上文闡述了Spark如何從Kafka中流式的讀取數據,下面我整理向Kafka中寫數據。與讀數據不同,Spark並沒有提供統一的接口用於寫入Kafka,所以我們需要使用底層Kafka接口進行包裝。
最直接的做法我們可以想到如下這種方式:
input.foreachRDD(rdd => // 不能在這里創建KafkaProducer rdd.foreachPartition(partition => partition.foreach{ case x:String=>{ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String,String](props) val message=new ProducerRecord[String, String]("output",null,x) producer.send(message) } } ) )
但是這種方式缺點很明顯,對於每個partition的每條記錄,我們都需要創建KafkaProducer,然后利用producer進行輸出操作,注意這里我們並不能將KafkaProducer的新建任務放在foreachPartition外邊,因為KafkaProducer是不可序列化的(not serializable)。顯然這種做法是不靈活且低效的,因為每條記錄都需要建立一次連接。如何解決呢?
1.首先,我們需要將KafkaProducer利用lazy val的方式進行包裝如下:
import java.util.concurrent.Future import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) }
2.之后我們利用廣播變量的形式,將KafkaProducer廣播到每一個executor,如下:
// 廣播KafkaSink val kafkaProducer: Broadcast[KafkaSink[String, String]] = { val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", Conf.brokers) p.setProperty("key.serializer", classOf[StringSerializer].getName) p.setProperty("value.serializer", classOf[StringSerializer].getName) p } log.warn("kafka producer init done!") ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig)) }
這樣我們就能在每個executor中愉快的將數據輸入到kafka當中:
//輸出到kafka segmentedStream.foreachRDD(rdd => { if (!rdd.isEmpty) { rdd.foreach(record => { kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2) // do something else }) } })
Spark streaming+Kafka應用
一般Spark Streaming進行流式處理,首先利用上文我們闡述的Direct方式從Kafka拉取batch,之后經過分詞、統計等相關處理,回寫到DB上(一般為Hbase或者Mysql),由此高效實時的完成每天大量數據的詞頻統計任務。
Spark streaming+Kafka調優
Spark streaming+Kafka的使用中,當數據量較小,很多時候默認配置和使用便能夠滿足情況,但是當數據量大的時候,就需要進行一定的調整和優化,而這種調整和優化本身也是不同的場景需要不同的配置。
1 合理的批處理時間(batchDuration)
幾乎所有的Spark Streaming調優文檔都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個參數便是批處理時間的設定。如果這個值設置的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那么就會造成數據不斷堆積,最終導致Spark Streaming發生阻塞。而且,一般對於batchDuration的設置不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。在平時的應用中,根據不同的應用場景和硬件配置,我設在1~10s之間,我們可以根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,如下圖:
2 合理的Kafka拉取量(maxRatePerPartition重要)
對於Spark Streaming消費kafka中數據的應用場景,這個配置是非常關鍵的,配置參數為:spark.streaming.kafka.maxRatePerPartition。這個參數默認是沒有上線的,即kafka當中有多少數據它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理數據的速度,同時這個參數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的數據能夠順利的處理完畢,做到盡可能高的吞吐量,而這個參數的調整可以參考可視化監控界面中的Input Rate和Processing Time,如下圖:
3 緩存反復使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream,如果被反復的使用,最好利用cache(),將該數據流緩存起來,防止過度的調度資源造成的網絡開銷。可以參考觀察Scheduling Delay參數,如下圖:
4 設置合理的GC
長期使用Java的小伙伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與內存的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些了解的小伙伴應該知道,在Java虛擬機中,將內存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對內存碎片進行整理,通常采用標記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
5 設置合理的CPU資源數
CPU的core數量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一台host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。但是增加executor的時候需要考慮好內存消耗,因為一台機器的內存分配給越多的executor,每個executor的內存就越小,以致出現過多的數據spill over甚至out of memory的情況。
6 設置合理的parallelism
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。
在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般默認設置為Kafka中Partition的數量。
7 使用高性能的算子
這里參考了美團技術團隊的博文,並沒有做過具體的性能測試,其建議如下:
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map
- 使用foreachPartitions替代foreach
- 使用filter之后進行coalesce操作
- 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
8 使用Kryo優化序列化性能
這個優化原則我本身也沒有經過測試,但是好多優化文檔有提到,這里也記錄下來。
在Spark中,主要有三個地方涉及到了序列化:
- 在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸(見“原則七:廣播大變量”中的講解)。
- 將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
- 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們只要設置序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
// 創建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
結果
經過種種調試優化,我們最終要達到的目的是,Spark Streaming能夠實時的拉取Kafka當中的數據,並且能夠保持穩定,如下圖所示:
當然不同的應用場景會有不同的圖形,這是本文詞頻統計優化穩定后的監控圖,我們可以看到Processing Time這一柱形圖中有一Stable的虛線,而大多數Batch都能夠在這一虛線下處理完畢,說明整體Spark Streaming是運行穩定的。