kafka:enable.auto.commit


一、背景

項目中有一個需求,是通過消費kafka的消息來處理數據,但是想要實現延遲消費的效果,於是想到了是否可以自己管理kafka的commit來實現,就是通過設置`enable.auto.commit`為False,預期是如果消費到了消息,但是不commit,kafka就會重新把消息放回隊列,后續還會再次消費到,直到超過設置的延遲時間再真正消費並commit。

於是寫了個demo來驗證,結果發現這個配置的效果並不是自己想要的。

二、生產者

生產者每秒鍾向kafka的topic發送一條消息。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time

from confluent_kafka import Producer, KafkaError
from confluent_kafka import TopicPartition
from confluent_kafka import OFFSET_BEGINNING

p = Producer({'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094'})

topic = 'nico-test'
msg_tpl = 'hello kafka:{0}'

while True:
    msg = msg_tpl.format(time.time())
    p.produce(topic, msg)
    print('Produce msg:{0}'.format(msg))
    time.sleep(1)

p.flush()

三、消費者

消費者設置了配置項enable.auto.commit:False。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time

from confluent_kafka import Consumer, KafkaError
from confluent_kafka import TopicPartition
from confluent_kafka import OFFSET_BEGINNING

c = Consumer({
    'bootstrap.servers':'localhost:9092, localhost:9093, localhost:9094', 
    'group.id':'nico-test', 
    'auto.offset.reset':'earliest', 
    'enable.auto.commit':False
})

topic = 'nico-test'

c.subscribe([topic])

cd = c.list_topics()
print(cd.cluster_id)
print(cd.controller_id)
print(cd.brokers)
print(cd.topics)
print(cd.orig_broker_id)
print(cd.orig_broker_name)

while True:
    msg = c.poll(1.0)
    if msg is None:
        continue

    print('topic:{topic}, partition:{partition}, offset:{offset}, headers:{headers}, key:{key}, msg:{msg}, timestamp:{timestamp}'.format(topic=msg.topic(), msg=msg.value(), headers=msg.headers(), key=msg.key(), offset=msg.offset(), partition=msg.partition(), timestamp=msg.timestamp()))

四、結果

結果是consumer啟動后會一直順序的消費消息,並且並不會把消息重放到隊列中,但是當consumer被kill掉重啟時,每次都是從最開始的時候消費的,所以總結一下,該配置項的作用是當配置為true時,每次獲取到消息后就會自動更新存儲在zookepper中的offset值。

最后自己也想了一下,這里不支持延遲消費的原因其實和kafka的實現原理有很大的關系,kafka是直接把消息存儲在磁盤文件中的,如果想要實現重放(支持延遲消費)那么就需要把該消息從消息隊列中刪除,然后重新插入到消息隊列,那這樣就跟kafka的設計相違背了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM