該博文方法有問題,正確方案在http://www.cnblogs.com/dplearning/p/7992994.html
背景:
搭建了一個kafka集群,建立了topic test,用group_id ttt 消耗topic.
但問題是,我消費的處理太慢了,導致了上百萬數據的擠壓,即offset滯后上百萬
現在,想放棄上一次的任務,執行新的任務。但是topic名稱和group_id不能變化。
想了幾個方案,覺得重置offset是最符合我需求的。
import os from confluent_kafka import Consumer, KafkaError, TopicPartition # 獲取最大logsize def get_logsize(): cmd = "/usr/local/share/applications/kafka/kafka_2.11-0.11.0.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group ttt --zookeeper x.x.x.x:2181 --topic test" res = os.popen(cmd).read() logsize = res.split("\n")[1].split()[4] return int(logsize) c = Consumer({'bootstrap.servers': 'x.x.x.x:9092', 'group.id': 'ttt', 'default.topic.config': {'auto.offset.reset': 'largest'}}) c.subscribe(['test']) logsize = get_logsize() tp = TopicPartition('test', 0, logsize) c.commit(offsets = [tp]) # 直接將offset置為logsize,跳過未消費的數據
我沒找到怎么直接通過confluent_kafka獲取logsize.
https://docs.confluent.io/3.0.0/clients/confluent-kafka-python/#configuration 有提到OFFSET_END,但是我用了后OFFSET_END值為-1,並沒有將offset置為最新。
只好用命令行獲取了.............
方案二(放棄):刪除topic以及所有相關數據。
上網查了查,太復雜了。要刪zookeeper和kafka的log文件,還要重啟服務。
這跟我想的不一樣啊,難道kafka就沒有直接清除數據的指令嗎.............
整體感受:kafka的API比我想象中少很多,或者是我沒找到....