【kafka】設置指定topic和group_id消耗的offset


該博文方法有問題,正確方案在http://www.cnblogs.com/dplearning/p/7992994.html

 

背景:

搭建了一個kafka集群,建立了topic test,用group_id  ttt 消耗topic.

但問題是,我消費的處理太慢了,導致了上百萬數據的擠壓,即offset滯后上百萬

現在,想放棄上一次的任務,執行新的任務。但是topic名稱和group_id不能變化。

 

想了幾個方案,覺得重置offset是最符合我需求的。

import os
from confluent_kafka import Consumer, KafkaError, TopicPartition

# 獲取最大logsize
def get_logsize():
    cmd = "/usr/local/share/applications/kafka/kafka_2.11-0.11.0.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group ttt --zookeeper x.x.x.x:2181 --topic test"
    res = os.popen(cmd).read()
    logsize = res.split("\n")[1].split()[4]
    return int(logsize)

c = Consumer({'bootstrap.servers': 'x.x.x.x:9092',
              'group.id': 'ttt',
              'default.topic.config': {'auto.offset.reset': 'largest'}})
c.subscribe(['test'])
logsize = get_logsize()
tp = TopicPartition('test', 0, logsize)
c.commit(offsets = [tp])  # 直接將offset置為logsize,跳過未消費的數據

我沒找到怎么直接通過confluent_kafka獲取logsize.

https://docs.confluent.io/3.0.0/clients/confluent-kafka-python/#configuration 有提到OFFSET_END,但是我用了后OFFSET_END值為-1,並沒有將offset置為最新。

只好用命令行獲取了.............

 

 

方案二(放棄):刪除topic以及所有相關數據。

上網查了查,太復雜了。要刪zookeeper和kafka的log文件,還要重啟服務。

這跟我想的不一樣啊,難道kafka就沒有直接清除數據的指令嗎.............

 

 

 

整體感受:kafka的API比我想象中少很多,或者是我沒找到....


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM