python操作kafka
一、什么是kafka
kafka特性:
(1) 通過磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能.
(2) 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息.
(3) 支持通過Kafka服務器和消費機集群來分區消息.
(4) 支持Hadoop並行數據加載.
術語:
Broker: Kafka集群包含一個或多個服務器,這種服務器被稱為broker
Topic: 每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition: Partition是物理上的概念,每個Topic包含一個或多個Partition.
Producer: 負責發布消息到Kafka broker
Consumer: 消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group: 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)
二、安裝
在pypi.python.org有很多關於操作kafka的組件,我們選擇weight最高的kafka 1.3.5
有internet網的情況下執行如下命令安裝:
pip install kafka
easy_install kafka
三、按照官網的樣例,先跑一個應用
1、生產者:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
msg = "msg%d" % i
producer.send('test', msg)
producer.close()
2、消費者(簡單demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers=['xxx.xx.xx.xx:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動后生產者、消費者可以正常消費。
3、消費者(消費群組)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['xxx.xx.xx.xx:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動多個消費者,只有其中可以可以消費到,滿足要求,消費組可以橫向擴展提高處理能力
4、消費者(讀取目前最早可讀的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['xxx.xx.xx.xxx:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默認為latest
源碼定義:{'smallest': 'earliest', 'largest': 'latest'}
5、消費者(手動設置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_servers=['xxx.xx.xx.xxx:9092'])
print consumer.partitions_for_topic("test") #獲取test主題的分區信息
print consumer.topics() #獲取主題列表
print consumer.subscription() #獲取當前消費者訂閱的主題
print consumer.assignment() #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,從第5個偏移量消費
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
6、消費者(訂閱多個主題)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test','test0')) #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
7、消費者(手動拉取消息)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #從kafka獲取消息
print msg
time.sleep(1)
8、消費者(消息掛起與恢復)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print num
print consumer.paused() #獲取當前掛起的消費者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume..."
consumer.resume(TopicPartition(topic=u'test', partition=0))
print "resume......"
pause執行后,consumer不能讀取,直到調用resume后恢復。