Apache Kafka 是什么?
Kafka 是一個開源的分布式流處理平台,其簡化了不同數據系統的集成。流指的是一個數據管道,應用能夠通過流不斷地接收數據。Kafka 作為流處理系統主要有兩個用處:
- 數據集成: Kafka 捕捉事件流或數據變化流,並將這些數據送給其它數據系統,如關系型數據庫,鍵值對數據庫或者數據倉庫。
- 流處理:Kafka接收事件流並保存在一個只能追加的隊列里,該隊列稱為日志(log)。日志里的信息是不可變的,因此支持連續實時的數據處理和流轉換,並使結果在系統級別可訪問。
相比於其它技術,Kafka 擁有更高的吞吐量,內置分區,副本和容錯率。這些使得 Kafka 成為大規模消息處理應用的良好解決方案。
Kafka 系統有三個主要的部分:
- 生產者(Producer): 產生原始數據的服務。
- 中間人(Broker): Kafka 是生產者和消費者之間的中間人,它使用API來獲取和發布數據。
- 消費者(Consumer): 使用中間人發布的數據的服務。
安裝 Kafka
見 Kafka 簡單實驗一
配置環境
我們的項目將包括:
生產者:將字符串發送給 Kafka 消費者: 獲取數據並展示在終端窗口中 Kafka: 作為中間人
安裝需要的依賴包
pip install kafka-python
創建生產者
生產者是給 Kafka 中間人發送消息的服務。值得注意的是,生產者並不關注最終消費或加載數據的消費者。 創建生產者: 創建一個 producer.py 文件並添加如下代碼:
import time from kafka import SimpleProducer, KafkaClient # connect to Kafka kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) # Assign a topic topic = 'my-topic'
創建消息:
循環生成1到100之間的數字
發送消息:
Kafka 消息是二進制字符串格式(byte)
以下是完整的生產者代碼:
import time from kafka import SimpleProducer, KafkaClient # connect to Kafka kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) # Assign a topic topic = 'my-topic' def test(): print('begin') n = 1 while (n<=100): producer.send_messages(topic, str(n)) print "send" + str(n) n += 1 time.sleep(0.5) print('done') if __name__ == '__main__': test()
創建消費者
消費者監聽並消費來自 Kafka 中間人的消息。我們的消費者應該監聽 my-topic 主題的消息並將消息展示。
以下是消費者代碼(consumer.py):
from kafka import KafkaConsumer #connect to Kafka server and pass the topic we want to consume consumer = KafkaConsumer('my-topic') print "begin" for msg in consumer: print msg
運行項目
確保 Kafka 在運行
打開兩個終端,在第一個終端中運行消費者:
$ python consumer.py
在第二個終端運行生產者:
$ python producer.py