該博文方法有問題,正確方案在http://www.cnblogs.com/dplearning/p/7992994.html
將指定group對應的offset重置到最大值,跳過未消費數據
代碼如下:
# coding:utf-8 import os from confluent_kafka import Consumer, TopicPartition import traceback def reset_kafka_offset(group, topic): broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.x:9092" c = Consumer({'bootstrap.servers': broker_list, 'group.id': group, 'default.topic.config': {'auto.offset.reset': 'smallest'}}) c.subscribe([topic]) tp = TopicPartition(topic, 0) tp_out = c.committed([tp]) init_offset = tp_out[0].offset if int(init_offset) == -1001: #是一個新的group 沒有消費過 # 如果是一個新的group.id必須先消費一條消息,這樣后面的重置offset才有效, 如果不消費,重置offset前后獲取到的offset值都是-1001 msg = c.poll() if not msg.error(): msg_data = msg.value().decode('utf-8') c.commit() tp = TopicPartition(topic, 0) watermark_offsets = c.get_watermark_offsets(tp) # 獲取offset最大最小值 print watermark_offsets if watermark_offsets: logsize = watermark_offsets[1] # offset最大值 if logsize is not None: tp1 = TopicPartition(topic, 0, int(logsize)) c.commit(offsets=[tp1], async=False) # 直接將offset置為logsize,跳過未消費的數據 tp_out = c.committed([tp]) # 查看提交的offset位置 print tp_out[0].offset c.close() if __name__ == "__main__": reset_kafka_offset("test", "test")