之前寫過兩篇關於重置offset的博文,后來使用過程中都有問題。
經過各種嘗試,終於找到了解決方案。
直接上代碼:
# coding=utf8 from confluent_kafka import Consumer, KafkaError, TopicPartition def reset(topic, group, partition, new_offset): broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092" new_offset = int(new_offset) - 1 #從new_offset-1的地方開始消費,這樣消費一條后提交就是new_offset位置 tp_c = TopicPartition(topic, partition, new_offset) c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'enable.auto.commit': True, # 把自動提交打開 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.assign([tp_c]) c.poll() def reset_offset(topic, group, partition, new_offset): while True: try: reset(topic, group, partition, new_offset) break except: print "ERROR in reset, repeat." continue if __name__ == "__main__": reset_offset("test1", "test1", 0, 100)
代碼中的while循環是防止有時連不上kafka服務器報錯。
很奇怪,我直接用c.commit(offsets=[tp_c])不起作用,非要我消費后它自動重置才有效。
附,重置offset到最大值的操作。比上面多出了獲取最大值的部分代碼。
# coding=utf8 from confluent_kafka import Consumer, KafkaError, TopicPartition def reset(topic, group): broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092" c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'enable.auto.commit': False, 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.subscribe([topic]) # 這一句必須有,否則后面get_watermark_offsets會報錯 Failed to get watermark offsets: Broker: Leader not available tp = TopicPartition(topic, 0) committed = c.committed([tp]) # 這一句必須有,否則后面get_watermark_offsets會報錯 Failed to get watermark offsets: Broker: Leader not available print "committed: %s" % committed[0].offset watermark_offsets = c.get_watermark_offsets(tp) print "watermark_offsets:%s %s" % (watermark_offsets[0], watermark_offsets[1]) new_offset = int(watermark_offsets[1]) - 1 print new_offset tp_c = TopicPartition(topic, 0, new_offset) c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'enable.auto.commit': True, # 把自動提交打開 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.assign([tp_c]) c.poll() def reset_offset(topic, group): while True: try: reset(topic, group) break except: print "ERROR in reset, repeat." continue if __name__ == "__main__": reset_offset("test1", "test1")
注意: subscribe和assign是不能同時使用的。subscribe表示訂閱topic,從kafka記錄的offset開始消費。assign表示從指定的offset開始消費。
問題:
1.為何獲取watermark_offsets必須要使用subscribe和committed,不使用就報錯Failed to get watermark offsets: Broker: Leader not available?
2.c.commit(offsets=[tp]) 在什么情況下有效?
