python+kafka,從指定位置消費數據


# @staticmethod
def get_kafka_reviews(self):
# print type(self.bootstrap_servers)
consumer = kafka.KafkaConsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=False)
consumer.subscribe(topics=(self.topics)) #訂閱要消費的主題

# print consumer.topics()
# print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #獲取當前主題的最新偏移量

review_list =[]
for message in consumer:
print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
review_list.append(message.value)
if len(review_list)==self.num: #先取100條來消費
break
return review_list




解釋:
consumer = kafka.KafkaConsumer(bootstrap_servers=[self.bootstrap_servers],group_id='wm_group',auto_offset_reset='latest', enable_auto_commit=False)
自動提交位移設為flase, 默認為取最新的偏移量,重新建立一個guou_id,這樣就實現了不影響別的應用程序消費數據,又能消費到最新數據,實現預警(先於用戶發現)的目的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM