本文講Spark Streamming使用Direct方式讀取Kafka,並在輸出(存儲)操作之后提交offset到Kafka里實現程序讀寫操作有且僅有一次,即程序重啟之后之前消費並且輸出過的數據不再重復消費,接着上次消費的位置繼續消費Kafka里的數據。
Spark Streamming+Kafka官方文檔:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
1、提交offset的程序
根據官方文檔可知,在spark代碼里可以獲取對應的offset信息,並且可以提交offset存儲到kafka中。
代碼:
package com.dkl.leanring.spark.kafka import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.TaskContext object KafkaOffsetDemo { def main(args: Array[String]) { //創建sparkConf val sparkConf = new SparkConf().setAppName("KafkaOffsetDemo").setMaster("local[2]") // 創建StreamingContext batch size 為 1秒 val ssc = new StreamingContext(sparkConf, Seconds(1)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "ambari.master.com:6667", //kafka集群地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "KafkaOffsetDemo", //消費者組名 "auto.offset.reset" -> "earliest", //當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,則這個消費者的偏移量會在后台自動提交 val topics = Array("top1") //消費主題 //創建DStream,返回接收到的輸入數據 val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) // 打印獲取到的數據,因為1秒刷新一次,所以數據長度大於0時才打印 stream.foreachRDD(f => { if (f.count > 0) { println("=============================") println("打印獲取到的kafka里的內容") f.foreach(f => { val value = f.value() println(value) }) println("=============================") println("打印offset的信息") // offset val offsetRanges = f.asInstanceOf[HasOffsetRanges].offsetRanges //打印offset f.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } println("=============================") // 等輸出操作完成后提交offset stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) //啟動 ssc.start() //等待停止 ssc.awaitTermination() } }
說明:
- auto.offset.reset設置為earliest,即當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始,這樣設置的目的是為了一開始可以獲取到kafka對應主題下的所有的歷史消息。
- enable.auto.commit 設置為false,如果是true,則這個消費者的偏移量會在后台自動提交,這樣設置目的是為了后面自己提交offset,因為如果雖然獲取到了消息,但是后面的轉化操作並將結果寫到如hive中並沒有完成程序就掛了的話,這樣是不能將這次的offset提交的,這樣就可以等程序重啟之后接着上次失敗的地方繼續消費
- group.id 是不能變得,也就是offset是和topic和group綁定的,如果換一個group的話,程序將從頭消費所有的歷史數據
- 這個api是將offset存儲到kakfa的一個指定的topic里,名字為__consumer_offsets,而不是zookeeper中
2、測試程序
1、首先創建對應的topic
2、生產幾條數據作為歷史消息 -
bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic top1
3、啟動上面的程序
4、繼續生產幾條數據
接下來先看一下結果:由圖可得,這樣可以將歷史數據全部打印出來,並且后面實時增加的數據,也打印出來了,且可以看到offset是在增加的,最后一個offset是202,那么接下來測試一下程序重啟之后是否會接着之前的數據繼續消費呢
5、停止程序
6、生產幾條數據
7、啟動程序
看一下結果:
可以看出,程序確實是接着上次消費的地方消費的,為了證實這一點,我將earliest和offset圈了起來,從offset可以看到是從上次的202開始消費的。3、關於offset過期時間
kafka offset默認的過期時間是一天,當上面的程序掛掉,一天之內沒有重啟,也就是一天之內沒有保存新的offset的話,那么之前的offset就會被刪除,再重啟程序,就會從頭開始消費kafka里的所有歷史數據,這種情況是有問題的,所以可以通過設置offsets.retention.minutes自定義offset過期時間,該設置單位為分鍾,默認為1440。
修改kafka的offset過期時間詳細信息見:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/4、自己保存offset
可以通過自己保存offset的信息到數據庫里,然后需要時再取出來,根據得到的offset信息消費kafka里的數據,這樣就不用擔心offset的過期的問題了,因為沒有自己寫代碼實現,所以先給出官網的示例代碼:
-
/ The details depend on your data store, but the general idea looks like this // begin from the the offsets committed to the database val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") }.toMap val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) ) stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val results = yourCalculation(rdd) // begin your transaction // update results // update offsets where the end of existing offsets matches the beginning of this batch of offsets // assert that offsets were updated correctly // end your transaction }