來源於 https://www.cnblogs.com/small-office/p/9399907.html
1 1、先看最簡單的場景,生產者生產消息,消費者接收消息,下面是生產者的簡單代碼。 2 -------------------------------------------------------------------------------- 3 #!/usr/bin/env python 4 # -*- coding: utf-8 -*- 5 import json 6 from kafka import KafkaProducer 7 8 producer = KafkaProducer(bootstrap_servers='xxxx:x') 9 10 msg_dict = { 11 "sleep_time": 10, 12 "db_config": { 13 "database": "test_1", 14 "host": "xxxx", 15 "user": "root", 16 "password": "root" 17 }, 18 "table": "msg", 19 "msg": "Hello World" 20 } 21 msg = json.dumps(msg_dict) 22 producer.send('test_rhj', msg, partition=0) 23 producer.close() 24 -------------------------------------------------------------------------------- 25 下面是消費者的簡單代碼: 26 from kafka import KafkaConsumer 27 28 consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x']) 29 for msg in consumer: 30 recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) 31 print recv 32 33 -------------------------------------------------------------------------------- 34 下面是結果:
2、如果想要完成負載均衡,就需要知道kafka的分區機制,同一個主題,可以為其分區,在生產者不指定分區的情況,kafka會將多個消息分發到不同的分區,消費者訂閱時候如果不指定服務組,
會收到所有分區的消息,如果指定了服務組,則同一服務組的消費者會消費不同的分區,如果2個分區兩個消費者的消費者組消費,則,每個消費者消費一個分區,如果有三個消費者的服務組,
則會出現一個消費者消費不到數據;如果想要消費同一分區,則需要用不同的服務組。以此為原理,我們對消費者做如下修改:
——————————————————————————————————— from kafka import KafkaConsumer consumer = KafkaConsumer('test_rhj', group_id='123456', bootstrap_servers=['10.43.35.25:4531']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv ------------------------------------------------------------------------------------ 然后我們開兩個消費者進行消費,生產者分別往0分區和1分區發消息結果如下,可以看到,一個消費者只能消費0分區,另一個只能消費1分區:


3、kafka提供了偏移量的概念,允許消費者根據偏移量消費之前遺漏的內容,這基於kafka名義上的全量存儲,可以保留大量的歷史數據,歷史保存時間是可配置的,一般是7天,如果偏移量定位到了已刪除的位置那也會有問題,但是這種情況可能很小;每個保存的數據文件都是以偏移量命名的,當前要查的偏移量減去文件名就是數據在該文件的相對位置。要指定偏移量消費數據,需要指定該消費者要消費的分區,否則代碼會找不到分區而無法消費,代碼如下:
from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531']) consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)]) print consumer.partitions_for_topic("test_rhj") # 獲取test主題的分區信息 print consumer.assignment() print consumer.beginning_offsets(consumer.assignment()) consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv ----------------------------------------------------------------------------------- 因為指定的便宜量為0,所以從一開始插入的數據都可以查到,而且因為指定了分區,指定的分區結果都可以消費,結果如下:

4、有時候,我們並不需要實時獲取數據,因為這樣可能會造成性能瓶頸,我們只需要定時去獲取隊列里的數據然后批量處理就可以,這種情況,我們可以選擇主動拉取數據
from kafka import KafkaConsumer import time consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531']) consumer.subscribe(topics=('test_rhj',)) index = 0 while True: msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息 print msg time.sleep(2) index += 1 print '--------poll index is %s----------' % index ----------------------------------------------------------------------------------- 結果如下,可以看到,每次拉取到的都是前面生產的數據,可能是多條的列表,也可能沒有數據,如果沒有數據,則拉取到的為空:
