java 管理kafka偏移量_Kafka偏移量(Offset)管理


1.定義

Kafka中的每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序號,用於partition唯一標識一條消息。

Offset記錄着下一條將要發送給Consumer的消息的序號。

流處理系統常見的三種語義:

最多一次

每個記錄要么處理一次,要么根本不處理

至少一次

這比最多一次強,因為它確保不會丟失任何數據。但是可能有重復的

有且僅有一次

每條記錄將被精確處理一次,沒有數據會丟失,也沒有數據會被多次處理The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

At most once: Each record will be either processed once or not processed at all.

At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.

Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.

2.Kafka offset Management with Spark Streaming

Offset首先建議存放到Zookeeper中,Zookeeper相比於HBASE等來說更為輕量級,且是做HA(高可用性集群,High Available)的,offset更安全。

對於offset管理常見的兩步操作:

保存offsets

獲取offsets

 

3.環境准備

啟動一個Kafka生產者,測試使用topic:tp_kafka:

./kafka-console-producer.sh --broker-list hadoop000:9092 --topic tp_kafka

啟動一個Kafka消費者:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_kafka

在IDEA中生產數據:

package com.taipark.spark;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

import java.util.UUID;

public class KafkaApp {
public static void main(String[] args) {
String topic = "tp_kafka";

Properties props = new Properties();

props.put("serializer.class","kafka.serializer.StringEncoder");

props.put("metadata.broker.list","hadoop000:9092");

props.put("request.required.acks","1");

props.put("partitioner.class","kafka.producer.DefaultPartitioner");

Producer producer = new Producer<>(new ProducerConfig(props));

for(int index = 0;index <100; index++){
KeyedMessage message = new KeyedMessage<>(topic, index + "", "taipark" + UUID.randomUUID());

producer.send(message);

}

System.out.println("數據生產完畢");

}

}

 

4.第一種offset管理方式:smallest

Spark Streaming鏈接Kafka統計個數:

package com.taipark.spark.offset

import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

object Offset01App {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")

val ssc = new StreamingContext(sparkConf,Seconds(10))

val kafkaParams = Map[String, String](

"metadata.broker.list" -> "hadoop000:9092",

"auto.offset.reset" -> "smallest"

)

val topics = "tp_kafka".split(",").toSet

val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

messages.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
println("Taipark" + rdd.count())

}

})

ssc.start()

ssc.awaitTermination()

}

}

 

再生產100條Kafka數據->Spark Streaming接受:

 

但這時如果Spark Streaming停止后重啟:

 

會發現這里重頭開始計數了,原因是代碼里將auto.offset.reset的值設置為了smallest。(kafka-0.10.1.X版本之前)

5.第二種offset管理方式:checkpoint

在HDFS中創建一個/offset文件夾:

hadoop fs -mkdir /offset

使用Checkpoint:

package com.taipark.spark.offset

import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object Offset01App {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")

val kafkaParams = Map[String, String](

"metadata.broker.list" -> "hadoop000:9092",

"auto.offset.reset" -> "smallest"

)

val topics = "tp_kafka".split(",").toSet

val checkpointDirectory = "hdfs://hadoop000:8020/offset/"

def functionToCreateContext():StreamingContext = {
val ssc = new StreamingContext(sparkConf,Seconds(10))

val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

//設置checkpoint

ssc.checkpoint(checkpointDirectory)

messages.checkpoint(Duration(10*1000))

messages.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
println("Taipark" + rdd.count())

}

})

ssc

}

val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _)

ssc.start()

ssc.awaitTermination()

}

}

注:IDEA修改HDFS用戶,在設置里的VM options中:

-DHADOOP_USER_NAME=hadoop

先啟動:

 

發現消費了之前的100條。這是停止之后,生產100條,再啟動:

 

發現這里只讀取了上次結束到這次啟動之間的100條,而不是像smallest一樣讀取之前所有條數。

但是checkpiont存在問題,如果采用這種方式管理offset,只要業務邏輯發生了變化,則checkpoint就沒有作用了。因為其調用的是getOrCreate()。

6.第三種offset管理方式:手動管理偏移量

思路:

創建StreamingContext

從Kafka獲取數據 <== 拿到offset

根據業務邏輯進行處理

將處理結果寫入外部存儲 ==>保存offset

啟動程序等待線程終止package com.taipark.spark.offset

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}

import org.apache.spark.streaming.{Seconds, StreamingContext}

object Offset01App {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")

val ssc = new StreamingContext(sparkConf,Seconds(10))

val kafkaParams = Map[String, String](

"metadata.broker.list" -> "hadoop000:9092",

"auto.offset.reset" -> "smallest"

)

val topics = "tp_kafka".split(",").toSet

//從某地獲取偏移量

val fromOffsets = Map[TopicAndPartition,Long]()

val messages = if(fromOffsets.size == 0){ //從頭消費

KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

}else{ //從指定偏移量消費

val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key,mm.message())

KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)

)

}

messages.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
//業務邏輯

println("Taipark" + rdd.count())

//將offset提交保存到某地

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

offsetRanges.foreach(x =>{
//提交如下信息提交到外部存儲

println(s"${x.topic} ${x.partition} ${x.fromOffset} ${x.untilOffset}")

})

}

})

ssc.start()

ssc.awaitTermination()

}

}

先保存offset后保存數據可能導致數據丟失

先保存數據后保存offset可能導致數據重復執行

 

解決方式1:實現冪等(idempotent)

在編程中一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。

解決方式2:事務 (transaction)

1.數據庫事務可以包含一個或多個數據庫操作,但這些操作構成一個邏輯上的整體。

2.構成邏輯整體的這些數據庫操作,要么全部執行成功,要么全部不執行。

3.構成事務的所有操作,要么全都對數據庫產生影響,要么全都不產生影響,即不管事務是否執行成功,數據庫總能保持一致性狀態。

4.以上即使在數據庫出現故障以及並發事務存在的情況下依然成立。

將業務邏輯與offset保存放在一個事務里,僅執行一次。

7.Kafka-0.10.1.X版本之后的auto.kafka.reset:

earliest

當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

latest

當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據

none

topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

標簽:String,val,Offset,偏移量,kafka,offset,import,Kafka,ssc

來源: https://blog.csdn.net/qq_36329973/article/details/104825902


————————————————
版權聲明:本文為CSDN博主「weixin_39896256」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_39896256/article/details/114950014


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM