Kafka 簡單實驗二(Python實現簡單生產者消費者)


Apache Kafka 是什么?

Kafka 是一個開源的分布式流處理平台,其簡化了不同數據系統的集成。流指的是一個數據管道,應用能夠通過流不斷地接收數據。Kafka 作為流處理系統主要有兩個用處:

  1. 數據集成: Kafka 捕捉事件流或數據變化流,並將這些數據送給其它數據系統,如關系型數據庫,鍵值對數據庫或者數據倉庫。
  2. 流處理:Kafka接收事件流並保存在一個只能追加的隊列里,該隊列稱為日志(log)。日志里的信息是不可變的,因此支持連續實時的數據處理和流轉換,並使結果在系統級別可訪問。

相比於其它技術,Kafka 擁有更高的吞吐量,內置分區,副本和容錯率。這些使得 Kafka 成為大規模消息處理應用的良好解決方案。

Kafka 系統有三個主要的部分:

  1. 生產者(Producer): 產生原始數據的服務。
  2. 中間人(Broker): Kafka 是生產者和消費者之間的中間人,它使用API來獲取和發布數據。
  3. 消費者(Consumer): 使用中間人發布的數據的服務。

安裝 Kafka

  見 Kafka 簡單實驗一

配置環境

我們的項目將包括:

  生產者:將字符串發送給 Kafka   消費者: 獲取數據並展示在終端窗口中  Kafka: 作為中間人

安裝需要的依賴包

pip install kafka-python

創建生產者

生產者是給 Kafka 中間人發送消息的服務。值得注意的是,生產者並不關注最終消費或加載數據的消費者。 創建生產者: 創建一個 producer.py 文件並添加如下代碼:

import time
from kafka import SimpleProducer, KafkaClient

#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

創建消息:

  循環生成1到100之間的數字

發送消息:

  Kafka 消息是二進制字符串格式(byte)

以下是完整的生產者代碼:

import time
from kafka import SimpleProducer, KafkaClient

#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

def test():
    print('begin')
    n = 1
    while (n<=100):
        producer.send_messages(topic, str(n))
        print "send" + str(n)
        n += 1
        time.sleep(0.5)
    print('done')

if __name__ == '__main__':
    test()

創建消費者

消費者監聽並消費來自 Kafka 中間人的消息。我們的消費者應該監聽 my-topic 主題的消息並將消息展示。

以下是消費者代碼(consumer.py):

from kafka import KafkaConsumer

#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic')

print "begin"
for msg in consumer:
    print msg

運行項目

確保 Kafka 在運行
打開兩個終端,在第一個終端中運行消費者:

$ python consumer.py

在第二個終端運行生產者:

$ python producer.py


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM