【kafka】confluent_kafka重置offset


之前寫過兩篇關於重置offset的博文,后來使用過程中都有問題。

經過各種嘗試,終於找到了解決方案。

直接上代碼:

# coding=utf8

from confluent_kafka import Consumer, KafkaError, TopicPartition


def reset(topic, group, partition, new_offset):
    broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
    new_offset = int(new_offset) - 1  #從new_offset-1的地方開始消費,這樣消費一條后提交就是new_offset位置
    tp_c = TopicPartition(topic, partition, new_offset)
    c = Consumer({'bootstrap.servers': broker_list,
                  'group.id': group,
                  'enable.auto.commit': True,  # 把自動提交打開
                  'default.topic.config': {'auto.offset.reset': 'smallest'}})
    c.assign([tp_c])
    c.poll()


def reset_offset(topic, group, partition, new_offset):
    while True:
        try:
            reset(topic, group, partition, new_offset)
            break
        except:
            print "ERROR in reset, repeat."
            continue


if __name__ == "__main__":
    reset_offset("test1", "test1", 0, 100)

代碼中的while循環是防止有時連不上kafka服務器報錯。

很奇怪,我直接用c.commit(offsets=[tp_c])不起作用,非要我消費后它自動重置才有效。

 

附,重置offset到最大值的操作。比上面多出了獲取最大值的部分代碼。

# coding=utf8

from confluent_kafka import Consumer, KafkaError, TopicPartition


def reset(topic, group):
    broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
    c = Consumer({'bootstrap.servers': broker_list,
                  'group.id': group,
                  'enable.auto.commit': False,
                  'default.topic.config': {'auto.offset.reset': 'smallest'}})
    c.subscribe([topic])  # 這一句必須有,否則后面get_watermark_offsets會報錯 Failed to get watermark offsets: Broker: Leader not available
    tp = TopicPartition(topic, 0)
    committed = c.committed([tp]) # 這一句必須有,否則后面get_watermark_offsets會報錯 Failed to get watermark offsets: Broker: Leader not available
    print "committed: %s" % committed[0].offset
    watermark_offsets = c.get_watermark_offsets(tp)
    print "watermark_offsets:%s %s" % (watermark_offsets[0], watermark_offsets[1])
    new_offset = int(watermark_offsets[1]) - 1
    print new_offset
    tp_c = TopicPartition(topic, 0, new_offset)

    c = Consumer({'bootstrap.servers': broker_list,
                  'group.id': group,
                  'enable.auto.commit': True,  # 把自動提交打開
                  'default.topic.config': {'auto.offset.reset': 'smallest'}})
    c.assign([tp_c])
    c.poll()


def reset_offset(topic, group):
    while True:
        try:
            reset(topic, group)
            break
        except:
            print "ERROR in reset, repeat."
            continue


if __name__ == "__main__":
    reset_offset("test1", "test1")

注意: subscribe和assign是不能同時使用的。subscribe表示訂閱topic,從kafka記錄的offset開始消費。assign表示從指定的offset開始消費。

問題:

1.為何獲取watermark_offsets必須要使用subscribe和committed,不使用就報錯Failed to get watermark offsets: Broker: Leader not available?

2.c.commit(offsets=[tp]) 在什么情況下有效?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM