python操作kafka


python操作kafka

一、什么是kafka

kafka特性:
(1) 通過磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能.
(2) 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息.
(3) 支持通過Kafka服務器和消費機集群來分區消息.
(4) 支持Hadoop並行數據加載.

術語:

Broker: Kafka集群包含一個或多個服務器,這種服務器被稱為broker
Topic: 每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition: Partition是物理上的概念,每個Topic包含一個或多個Partition.
Producer: 負責發布消息到Kafka broker
Consumer: 消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group: 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)

二、安裝

在pypi.python.org有很多關於操作kafka的組件,我們選擇weight最高的kafka 1.3.5
有internet網的情況下執行如下命令安裝:

pip install kafka
easy_install kafka

三、按照官網的樣例,先跑一個應用

1、生產者:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['xxx.xx.xx.xxx:9092']) #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]

for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
    producer.close()
2、消費者(簡單demo):
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
bootstrap_servers=['xxx.xx.xx.xx:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))

啟動后生產者、消費者可以正常消費。

3、消費者(消費群組)
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['xxx.xx.xx.xx:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))

啟動多個消費者,只有其中可以可以消費到,滿足要求,消費組可以橫向擴展提高處理能力

4、消費者(讀取目前最早可讀的消息)
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['xxx.xx.xx.xxx:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默認為latest
源碼定義:{'smallest': 'earliest', 'largest': 'latest'}
5、消費者(手動設置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',
bootstrap_servers=['xxx.xx.xx.xxx:9092'])

print consumer.partitions_for_topic("test") #獲取test主題的分區信息
print consumer.topics() #獲取主題列表
print consumer.subscription() #獲取當前消費者訂閱的主題
print consumer.assignment() #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,從第5個偏移量消費
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
6、消費者(訂閱多個主題)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test','test0')) #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    message.offset, message.key,
    message.value))
7、消費者(手動拉取消息)
from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5) #從kafka獲取消息
    print msg
    time.sleep(1)
8、消費者(消息掛起與恢復)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['xxx.xx.xx.xxx:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused() #獲取當前掛起的消費者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"
pause執行后,consumer不能讀取,直到調用resume后恢復。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM