auto.offset.reset關乎kafka數據的讀取,是一個非常重要的設置。常用的二個值是latest和earliest,默認是latest。
一,latest和earliest區別
1,earliest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
2,latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
提交過offset,latest和earliest沒有區別,但是在沒有提交offset情況下,用latest直接會導致無法讀取舊數據。
二,創建topic
- # bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank
- Created topic "tank".
- # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank
- Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:
- Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
- Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
- Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
三,生產數據和接收生產數據
- [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank
- >1
- >2
- >3
- >4
- >5
- >6
- 。。。。。。。。。省略。。。。。。。。。
- [root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning
- 1
- 2
- 3
- 4
- 5
- 6
- 。。。。。。。。省略。。。。。。。。
四,測試代碼
- object tank {
- def main(args: Array[String]): Unit = {
- val pros: Properties = new Properties
- pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")
- /*分組由消費者決定,完全自定義,沒有要求*/
- pros.put("group.id", "tank")
- //設置為true 表示offset自動托管到kafka內部的一個特定名稱為__consumer_offsets的topic
- pros.put("enable.auto.commit", "false")
- pros.put("auto.commit.interval.ms", "1000")
- pros.put("max.poll.records", "5")
- pros.put("session.timeout.ms", "30000")
- //只有當offset不存在的時候,才用latest或者earliest
- pros.put("auto.offset.reset", "latest")
- pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
- pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
- val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)
- /*這里填寫主題名稱*/
- consumer.subscribe(util.Arrays.asList("tank"))
- val system = akka.actor.ActorSystem("system")
- system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))
- }
- object tankTest {
- def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {
- val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))
- if (!records.isEmpty) {
- for (record <- records) {
- if (record.value != null && !record.value.equals("")) {
- myLog.syncLog(record.value + "\t准備開啟消費者出列數據", "kafka", "get")
- }
- }
- consumer.commitSync()
- }
- }
- }
- }
五,測試1,過程如下
1,查看offset
- # bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe
- Error: Consumer group 'tank' does not exist.
在沒有提交offset的情況,會報這個錯誤
2,latest模式運行,拉取不到數據
2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-1 to offset 11.
2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-0 to offset 11.
2019-04-28 16:22:55 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=tank] Resetting offset for partition tank-2 to offset 11.
3,再用kafka-console-producer.sh生產數據,latest是可以拉到的,並且是拉取最新的數據(程序運行以后的數據),以前提交的數據是拉取不到的。
4,查看offset不報錯了
- # bin/kafka-consumer-groups.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --group tank --describe
- Consumer group 'tank' has no active members.
- TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- tank 1 12 14 2 - - -
- tank 0 12 14 2 - - -
- tank 2 13 15 2 - - -
5,將auto.offset.reset設置成earliest,第一次生產的數據也取不到
在這里要注意:如果kafka只接收數據,從來沒來消費過,程序一開始不要用latest,不然以前的數據就接收不到了。應當先earliest,然后二都都可以。
六,測試2
1,重新創建topic,重復上面的第二,第三步
2,代碼端先earliest,最早提交的數據是可以獲取到的,再生產數據也是可以獲取到的。
3,將auto.offset.reset設置成latest,再生產數據也是可以獲取到的。
七,結論
雖然auto.offset.reset默認是latest,但是建議使用earliest。
參考鏈接:http://blog.51yip.com/hadoop/2130.html