如何使用Python讀寫Kafka?


這一篇文章里面,我們要使用的一個第三方庫叫做kafka-python。大家可以使用pip或者pipenv安裝它。下面兩種安裝方案,任選其一即可

python3 -m pip install kafka-python
pipenv install kafka-python

如下圖所示:

創建配置文件

由於生產者和消費者都需要連接Kafka,所以我單獨寫了一個配置文件config.py用來保存連接Kafka所需要的各個參數,而不是直接把這些參數Hard Code寫在代碼里面:

# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'

本文演示所用的Kafka由我司平台組的同事搭建,需要賬號密碼才能連接,所以我在配置文件中加上了USERNAMEPASSWORD兩項。你使用的Kafka如果沒有賬號和密碼,那么你只需要SERVERTOPIC即可。

創建生產者

代碼簡單到甚至不需要解釋。首先使用KafkaProducer類連接 Kafka,獲得一個生產者對象,然后往里面寫數據。

import json
import time
import datetime
import config
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers=config.SERVER,
                         value_serializer=lambda m: json.dumps(m).encode())

for i in range(100):
    data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(config.TOPIC, data)
    time.sleep(1)

參數bootstrap_servers用於指定 Kafka 的服務器連接地址。

參數value_serializer用來指定序列化的方式。這里我使用 json 來序列化數據,從而實現我向 Kafka 傳入一個字典,Kafka 自動把它轉成 JSON 字符串的效果。

如下圖所示:

 

 注意,上圖中,我多寫了4個參數:

security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD

這四個參數是因為我這里需要通過密碼連接 Kafka 而加上的,如果你的 Kafka 沒有賬號密碼,就不需要這四個參數。

創建消費者

Kafka 消費者也需要連接 Kafka,首先使用KafkaConsumer類初始化一個消費者對象,然后循環讀取數據。代碼如下:

import config
from kafka import KafkaConsumer


consumer = KafkaConsumer(config.TOPIC,
                         bootstrap_servers=config.SERVER,
                         group_id='test',
                         auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)

KafkaConsumer 的第一個參數用於指定 Topic。你可以把這個 Topic 理解成 Redis 的 Key。

bootstrap_servers用於指定 Kafka 服務器連接地址。

group_id這個參數后面的字符串可以任意填寫。如果兩個程序的Topicgroup_id相同,那么他們讀取的數據不會重復,兩個程序的Topic相同,但是group_id不同,那么他們各自消費全部數據,互不影響。

auto_offset_rest 這個參數有兩個值,earliestlatest,如果省略這個參數,那么默認就是latest。這個參數會單獨介紹。這里先略過。

連接好 Kafka 以后,直接對消費者對象使用 for 循環迭代,就能持續不斷獲取里面的數據了。

運行演示

運行兩個消費者程序和一個生產者程序,效果如下圖所示。

我們可以看到,兩個消費者程序讀取數據不重復,不遺漏。

當所有數據都消費完成以后,如果你把兩個消費者程序關閉,再運行其中一個,你會發現已經沒有數據會被打印出來了。

但如果你修改一下 group_id,程序又能正常從頭開始消費了,如下圖所示

很多人都會搞混的幾個地方

earliest 與 latest

在我們創建消費者對象的時候,有一個參數叫做 auto_offset_reset='earliest'。有人看到 earliestlatest,想當然地認為設置為 earliest,就是從 Topic 的頭往后讀,設置為 latest就是忽略之前的數據,從程序運行以后,新來的數據開始讀。

這種看法是不正確的。

auto_offset_reset這個參數,只有在一個group第一次運行的時候才有作用,從第二次運行開始,這個參數就失效了。

假設現在你的 Topic 里面有100個數據,你設置了一個全新的 group_id 為 test2auto_offset_reset設置為 earliest。那么當你的消費者運行的時候,Kafka 會先把你的 offset 設置為0,然后讓你從頭開始消費的。
 
假設現在你的 Topic 里面有100個數據,你設置了一個全新的 group_id 為test3auto_offset_reset設置為 latest那么當你的消費者運行的時候,Kafka 不會給你返回任何數據,消費者看起來就像卡住了一樣,但是 Kafka 會直接強制把前100條數據的狀態設置為已經被你消費的狀態。所以當前你的 offset 就直接是99了。直到生產者插入了一條新的數據,此時消費者才能讀取到。這條新的數據對應的 offset 就變成了100。

假設現在你的 Topic 里面有100個數據,你設置了一個全新的 group_id 為test4auto_offset_reset設置為 earliest。那么當你的消費者運行的時候,Kafka 會先把你的 offset 設置為0,然后讓你從頭開始消費的。等消費到第50條數據時,你把消費者程序關了,把auto_offset_reset設置為latest,再重新運行。此時消費者依然會接着從第51條數據開始讀取。不會跳過剩下的50條數據。

所以,auto_offset_reset的作用,是在你的 group 第一次運行,還沒有 offset 的時候,給你設定初始的 offset。而一旦你這個 group 已經有 offset 了,那么auto_offset_reset這個參數就不會再起作用了


partition 是如何分配的?

對於同一個 Topic 的同一個 Group:

假設你的 Topic 有10個 Partition,一開始你只啟動了1個消費者。那么這個消費者會輪換着從這10個Partition 中讀取數據。

當你啟動第二個消費者時,Kafka 會從第一個消費者手上搶走5個Partition,分給第二個消費者。於是兩個消費者各自讀5個 Partition。互不影響。

當第三個消費者又出現時,Kafka 從第一個消費者手上再搶走1個 Partition,從第二個消費者手上搶走2個 Partition 給第三個消費者。於是,消費者1有4個 Partition,消費者2有3個 Partition,消費者3有3個 Partiton,互不影響。

當你有10個消費者一起消費時,每個消費者讀取一個 Partition,互不影響。

當第11個消費者出現時,它由於分配不到 Partition,所以它什么都讀不到。

所以在上一篇文章中,我說,在同一個 Topic,同一個 Group 中,你有多少個 Partiton,就能起多少個進程同時消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM