python kafka


kafka簡介(摘自百度百科)

簡介:

kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消費。

特性:
通過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
高吞吐量[2]  :即使是非常普通的硬件Kafka也可以支持每秒數百萬[2]  的消息
支持通過Kafka服務器和消費機集群來分區消息
支持Hadoop並行數據加載

術語:
Broker
Kafka集群包含一個或多個服務器,這種服務器被稱為broker
Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition
Partition是物理上的概念,每個Topic包含一個或多個Partition.
Producer
負責發布消息到Kafka broker
Consumer
消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。

一、安裝

在pypi.python.org有很多關於操作kafka的組件,我們選擇weight最高的kafka 1.3.5
1、有網的情況下執行如下命令安裝:
pip install kafka
easy_install kafka

2、無網的情況下把源碼下載下來,上傳到需要安裝的主機
壓縮包:kafka-1.3.5.tar.gz
解壓: tar xvf kafka-1.3.5.tar.gz
執行安裝命令:   cd kafka-1.3.5   
               python setup.py install
               
如安裝報依賴錯誤,需要把依賴的組件也下載下來,然后進行安裝,同樣的方法,不贅述!

二、按照官網的樣例,先跑一個應用

1、生產者:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092'])  #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]

for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()

2、消費者(簡單demo):

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

啟動后生產者、消費者可以正常消費。

3、消費者(消費群組)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

啟動多個消費者,只有其中可以可以消費到,滿足要求,消費組可以橫向擴展提高處理能力

4、消費者(讀取目前最早可讀的消息)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         auto_offset_reset='earliest',
                         bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默認為latest
源碼定義:{'smallest': 'earliest', 'largest': 'latest'}

5、消費者(手動設置偏移量)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',
                         bootstrap_servers=['172.21.10.136:9092'])

print consumer.partitions_for_topic("test")  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,從第5個偏移量消費
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

6、消費者(訂閱多個主題)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

7、消費者(手動拉取消息)

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #從kafka獲取消息
    print msg
    time.sleep(1)

8、消費者(消息掛起與恢復)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #獲取當前掛起的消費者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"

pause執行后,consumer不能讀取,直到調用resume后恢復。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM