sparkstreaming關於偏移量的管理
- 在 Direct DStream初始化的時候,需要指定一個包含每個topic的每個分區的offset用於讓Direct DStream從指定位置讀取數據。
- offsets就是步驟4中所保存的offsets位置
- 讀取並處理消息
- 處理完之后存儲結果數據
- 用虛線圈存儲和提交offset只是簡單強調用戶可能會執行一系列操作來滿足他們更加嚴格的語義要求。這包括冪等操作和通過原子操作的方式存儲offset。
- 最后,將offsets保存在外部持久化數據庫如 HBase, Kafka, HDFS, and ZooKeeper中
SparkStreaming使用checkpoint存在的問題
SparkStreaming在處理kafka中的數據時,存在一個kafka offset的管理問題:
- 官方的解決方案是checkpoint:
- checkpoint是對sparkstreaming運行過程中的元數據和
每次rdds的數據狀態保存到一個持久化系統中,當然這里面也包含了offset,一般是HDFS,S3,如果程序掛了,或者集群掛了,下次啟動仍然能夠從checkpoint中恢復,從而做到生產環境的7*24高可用。如果checkpoint存儲做hdfs中,會帶來小文件的問題。
- checkpoint是對sparkstreaming運行過程中的元數據和
但是checkpoint的最大的弊端在於,一旦你的流式程序代碼或配置改變了,或者更新迭代新功能了,這個時候,你先停舊的sparkstreaming程序,然后新的程序打包編譯后執行運行,會出現兩種情況:
- (1)啟動報錯,反序列化異常
- (2)啟動正常,但是運行的代碼仍然是上一次的程序的代碼。
為什么會出現上面的兩種情況?
- 這是因為checkpoint第一次持久化的時候會把整個相關的jar給序列化成一個二進制文件,每次重啟都會從里面恢復,但是當你新的
程序打包之后序列化加載的仍然是舊的序列化文件,這就會導致報錯或者依舊執行舊代碼。有的同學可能會說,既然如此,直接把上次的checkpoint刪除了,不就能啟動了嗎? 確實是能啟動,但是一旦你刪除了舊的checkpoint,新啟動的程序,只能從kafka的smallest或者largest的偏移量消費,默認是從最新的,如果是最新的,而不是上一次程序停止的那個偏移量
就會導致有數據丟失,如果是老的,那么就會導致數據重復。不管怎么樣搞,都有問題。
https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#upgrading-application-code
針對這種問題,spark官網給出了2種解決辦法:
(1)舊的不停機,新的程序繼續啟動,兩個程序並存一段時間消費。 評價:仍然有丟重復消費的可能
(2)停機的時候,記錄下最后一次的偏移量,然后新恢復的程序讀取這個偏移量繼續工作,從而達到不丟消息。 評價:官網沒有給出具體怎么操作,只是給了個思路:自己存儲offsets,
- Your own data store
For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
#Java
// Th#e details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
});
思路就在這段偽代碼中:數據存儲支持事務,在事務中更新結果和偏移量,確認偏移量正確更新。
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
SparkStreaming管理kafka中offsets的幾種方式
SparkStreaming管理kafka中offsets,就是將offsets采用某種數據格式存儲在某個地方,一般有如下幾種方式:
1. 存儲在kafka
Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消費者API即異步提交API。你可以在你確保你處理后的數據已經妥善保存之后使用commitAsync API(異步提交 API)來向Kafka提交offsets。新的消費者API會以消費者組id作為唯一標識來提交offsets
將offsets提交到Kafka中
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
注意: commitAsync()是Spark Streaming集成kafka-0-10版本中的,在Spark文檔提醒到它仍然是個實驗性質的API並且存在修改的可能性。
2. 存儲在zookeeper
kafka消費者的偏移量本身就是存儲在zookeeper中,在sparkstreaming中,需要在啟動時,顯示的指定從zookeeper中讀取偏移量即可,參考代碼如下:
step1: 初始化Zookeeper connection來從Zookeeper中獲取offsets
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.
def readOffsets(topics: Seq[String], groupId:String):
Map[TopicPartition, Long] = {
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics)
// /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}
} catch {
case e: Exception =>
LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
}
})
})
topicPartOffsetMap.toMap
}
step2: 使用獲取到的offsets來初始化Kafka Direct DStream
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
- 用於將 可恢復的偏移量 持久化到zookeeper的方法。
#注意: Kafka offset在ZooKeeper中的存儲路徑為/consumers/[groupId]/offsets/topic/[partitionId], 存儲的值為offset
def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
val acls = new ListBuffer[ACL]()
val acl = new ACL
acl.setId(ANYONE_ID_UNSAFE)
acl.setPerms(PERMISSIONS_ALL)
acls += acl
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
+ or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))
LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
})
}
3. 存儲在hbase
- DDL: 30天過期
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}
- RowKey Layout
row: <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family: offsets
qualifier: <PARTITION_ID>
value: <OFFSET_ID>
For each batch of messages, saveOffsets() function is used to persist last read offsets for a given kafka topic in HBase.對每一個批次的消息,使用saveOffsets()將從指定topic中讀取的offsets保存到HBase中
/*
Save offsets for each batch into HBase
*/
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
val put = new Put(rowKey.getBytes)
for(offset <- offsetRanges){
put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
Bytes.toBytes(offset.untilOffset.toString))
}
table.put(put)
conn.close()
}
在執行streaming任務之前,首先會使用getLastCommittedOffsets()來從HBase中讀取上一次任務結束時所保存的offsets。該方法將采用常用方案來返回kafka topic分區offsets。
情形1:Streaming任務第一次啟動,從zookeeper中獲取給定topic的分區數,然后將每個分區的offset都設置為0,並返回。
情形2:一個運行了很長時間的streaming任務停止並且給定的topic增加了新的分區,處理方式是從zookeeper中獲取給定topic的分區數,對於所有老的分區,offset依然使用HBase中所保存,對於新的分區則將offset設置為0。
情形3:Streaming任務長時間運行后停止並且topic分區沒有任何變化,在這個情形下,直接使用HBase中所保存的offset即可。
在Spark Streaming應用啟動之后如果topic增加了新的分區,那么應用只能讀取到老的分區中的數據,新的是讀取不到的。所以如果想讀取新的分區中的數據,那么就得重新啟動Spark Streaming應用。
/* Returns last committed offsets for all the partitions of a given topic from HBase in
following cases.
*/
def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,
zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={
val hbaseConf = HBaseConfiguration.create()
val zkUrl = zkQuorum+"/"+zkRootDir
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
sessionTimeout,connectionTimeOut)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME
)).get(TOPIC_NAME).toList.head.size
zkClientAndConnection._1.close()
zkClientAndConnection._2.close()
//Connect to HBase to retrieve last committed offsets
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
String.valueOf(System.currentTimeMillis())
val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
val scan = new Scan()
val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
stopRow.getBytes).setReversed(true))
val result = scanner.next()
var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
if (result != null){
//If the result from hbase scanner is not null, set number of partitions from hbase
to the number of cells
hbaseNumberOfPartitionsForTopic = result.listCells().size()
}
val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
if(hbaseNumberOfPartitionsForTopic == 0){
// initialize fromOffsets to beginning
for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
}
} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
// handle scenario where new partitions have been added to existing kafka topic
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
}
for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
}
} else {
//initialize fromOffsets from last run
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
}
}
scanner.close()
conn.close()
fromOffsets.toMap
}
當我們獲取到offsets之后我們就可以創建一個Kafka Direct DStream
val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,
zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))
在完成本批次的數據處理之后調用saveOffsets()保存offsets.
/*
For each RDD in a DStream apply a map transformation that processes the message.
*/
inputDStream.foreachRDD((rdd,batchTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
offset.untilOffset))
val newRDD = rdd.map(message => processMessage(message))
newRDD.count()
saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)
})
參考代碼:https://github.com/gdtm86/spark-streaming-kafka-cdh511-testing
總結
綜上所述,推薦使用zk維護offsets。
參考文獻
- Offset Management For Apache Kafka With Apache Spark Streaminglink
- 關於SparkStreaming的checkpoint的弊端
- 【翻譯】Spark Streaming 管理 Kafka Offsets 的方式探討
tips:本文屬於自己學習和實踐過程的記錄,很多圖和文字都粘貼自網上文章,沒有注明引用請包涵!如有任何問題請留言或郵件通知,我會及時回復。