Spark Streaming讀取Kafka數據的兩種方式


Kafka在0.80.10之間引入了一種新的消費者API,因此,Spark Streaming與Kafka集成,有兩種包可以選擇: spark-streaming-kafka-0-8spark-streaming-kafka-0-10。在使用時應注意以下幾點:

  1. spark-streaming-kafka-0-8兼容Kafka 0.8.2.1及以后的版本, 從Spark 2.3.0開始,對Kafka 0.8支持已被標記為過時。

  2. spark-streaming-kafka-0-10兼容Kafka 0.10.0及以后的版本, 從Spark 2.3.0開始, 此API是穩定版。

  3. 如果Kafka版本大於等於0.10.0,且Spark版本大於等於Spark 2.3.0,應使用spark-streaming-kafka-0-10

本文總結spark-streaming-kafka-0-8中兩種讀取Kafka數據的方式:createStreamcreateDirectStream

基於Receiver方式

POM依賴

 1 <dependencies>
 2      <!--spark-streaming-->
 3      <dependency>
 4          <groupId>org.apache.spark</groupId>
 5          <artifactId>spark-streaming_2.11</artifactId>
 6          <version>2.2.2</version>
 7      </dependency>
 8 
 9      <!--spark-streaming-kafka-plugin-->
10      <dependency>
11          <groupId>org.apache.spark</groupId>
12          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
13          <version>2.2.2</version>
14      </dependency>
15  </dependencies>

示例一

 1 // 1、Kafka配置
 2  // 配置zookeeper集群、消費者組
 3  val kafkaParams = Map(
 4    "zookeeper.connect" -> "localhost:2181",
 5    "group.id" -> groupID)
 6 
 7  // 2、topic_name與numThreads的映射
 8  // topic有幾個partition,就寫幾個numThreads。
 9  // 每個partition對應一個單獨線程從kafka取數據到Spark Streaming
10  val topics = Map(topicName -> numThreads)
11 
12  // 3、ReceiverInputDStream
13  // 注意:應先import kafka.serializer.StringDecoder再import org.apache.spark.streaming._
14  val kafkaStream= KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
15    ssc,
16    kafkaParams,
17    topics,
18    StorageLevel.MEMORY_AND_DISK_SER_2)

示例二

 1 // 1、topic_name與numThreads的映射
 2  // topic有幾個partition,就寫幾個numThreads。
 3  // 每個partition對應一個單獨線程從kafka取數據到Spark Streaming
 4  val topics = Map(topicName -> numThreads)
 5 
 6  // 2、ReceiverInputDStream
 7  // 底層先根據zkQuorum、groupId 構造kafkaParams,
 8  // 然后再調用createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel)
 9  val kafkaStream=KafkaUtils.createStream(
10    ssc=ssc,
11    zkQuorum="localhost:2181",
12    groupId = groupID,
13    topics,
14    StorageLevel.MEMORY_AND_DISK_SER_2
15  )

特點

  1. 需要使用單獨的Receiver線程來異步獲取Kafka數據。

  2. Receiver底層實現中使用了Kafka高級消費者API,因此,不需要自己管理Offset,只需指定Zookeeper和消費者組GroupID,系統便會自行管理。

  3. 執行過程: Spark Streaming啟動時,會在Executor中同時啟動Receiver異步線程用於從Kafka持續獲取數據,獲取的數據先存儲在Receiver中(存儲方式由StorageLevel決定),后續,當Batch Job觸發后,這些數據會被轉移到剩下的Executor中被處理。處理完畢后,Receiver會自動更新Zookeeper中的Offset。

  4. 默認情況下,程序失敗或Executor宕掉后可能會丟失數據,為避免數據丟失,可啟用預寫日志(Write Ahead Log,WAL)。將Receiver收到的數據再備份一份到更可靠的系統如HDFS分布式文件中,以冗余的數據來換取數據不丟失。

  5. 生產下,為保證數據完全不丟失,一般需要啟用WAL。啟用WAL,在數據量較大,網絡不好情況下,會嚴重降低性能。


基於Direct(No Receiver)方式

POM依賴

 1 <dependencies>
 2      <!--spark-streaming-->
 3      <dependency>
 4          <groupId>org.apache.spark</groupId>
 5          <artifactId>spark-streaming_2.11</artifactId>
 6          <version>2.3.1</version>
 7      </dependency>
 8 
 9      <!--spark-streaming-kafka-plugin-->
10      <dependency>
11          <groupId>org.apache.spark</groupId>
12          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
13          <version>2.3.1</version>
14      </dependency>
15  </dependencies>

示例一

 1 // 1、Kafka配置
 2  // auto.offset.reset=latest 無提交的offset時,從最新的開始消費
 3  // enable.auto.commit=false 禁用后台自動提交offset,自己手動管理
 4  val kafkaParams = Map[String, Object](
 5    "bootstrap.servers" -> "localhost:9092",
 6    "key.deserializer" -> classOf[StringDeserializer],
 7    "value.deserializer" -> classOf[StringDeserializer],
 8    "auto.offset.reset" -> "latest",
 9    "enable.auto.commit" -> (false: java.lang.Boolean),
10    "group.id" -> groupID)
11 
12  // 2、DirectKafkaInputDStream
13  // LocationStrategies:本地策略。為提升性能,可指定Kafka Topic Partition的消費者所在的Executor。
14  // LocationStrategies.PreferConsistent:一致性策略。一般情況下用這個策略就OK。將分區盡可能分配給所有可用Executor。
15  // LocationStrategies.PreferBrokers:特殊情況,如果Executor和Kafka Broker在同一主機,則可使用此策略。
16  // LocationStrategies.PreferFixed:特殊情況,當Kafka Topic Partition負荷傾斜,可用此策略,手動指定Executor來消費特定的Partition.
17  // ConsumerStrategies:消費策略。
18  // ConsumerStrategies.Subscribe/SubscribePattern:可訂閱一類Topic,且當新Topic加入時,會自動訂閱。一般情況下,用這個就OK。
19  // ConsumerStrategies.Assign:可指定要消費的Topic-Partition,以及從指定Offset開始消費。
20  val kafkaStream=KafkaUtils.createDirectStream[String,String](
21    ssc,
22    LocationStrategies.PreferConsistent,
23    ConsumerStrategies.Subscribe[String,String](List(topicName),kafkaParams)
24  )

示例二

 1 // 1、Kafka配置
 2  // auto.offset.reset=latest 無提交的offset時,從最新的開始消費
 3  // enable.auto.commit=false 禁用后台自動提交offset,自己手動管理
 4  val kafkaParams = Map[String, Object](
 5    "bootstrap.servers" -> "localhost:9092",
 6    "key.deserializer" -> classOf[StringDeserializer],
 7    "value.deserializer" -> classOf[StringDeserializer],
 8    "auto.offset.reset" -> "latest",
 9    "enable.auto.commit" -> (false: java.lang.Boolean),
10    "group.id" -> groupID)
11 
12  // 2、DirectKafkaInputDStream
13  // LocationStrategies.PreferConsistent:一致性策略。
14  // ConsumerStrategies.Assign:從指定Topic-Partition的Offset開始消費。
15  val initOffset=Map(new TopicPartition(topicName,0)->10L)
16  val kafkaStream=KafkaUtils.createDirectStream[String,String](
17    ssc,
18    LocationStrategies.PreferConsistent,
19    ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
20  )

特點

  1. 不需要使用單獨的Receiver線程從Kafka獲取數據。

  2. 使用Kafka簡單消費者API,不需要ZooKeeper參與,直接從Kafka Broker獲取數據。

  3. 執行過程:Spark Streaming Batch Job觸發時,Driver端確定要讀取的Topic-Partition的OffsetRange,然后由Executor並行從Kafka各Partition讀取數據並計算。

  4. 為保證整個應用EOS, Offset管理一般需要借助外部存儲實現。如Mysql、HBase等。

  5. 由於不需要WAL,且Spark Streaming會創建和Kafka Topic Partition一樣多的RDD Partition,且一一對應,這樣,就可以並行讀取,大大提高了性能。

  6. Spark Streaming應用啟動后,自己通過內部currentOffsets變量跟蹤Offset,避免了基於Receiver的方式中Spark Streaming和Zookeeper中的Offset不一致問題。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM