這一篇文章里面,我們要使用的一個第三方庫叫做kafka-python。大家可以使用pip或者pipenv安裝它。下面兩種安裝方案,任選其一即可
python3 -m pip install kafka-python
pipenv install kafka-python
如下圖所示:

創建配置文件
由於生產者和消費者都需要連接Kafka,所以我單獨寫了一個配置文件config.py用來保存連接Kafka所需要的各個參數,而不是直接把這些參數Hard Code寫在代碼里面:
# config.py SERVER = '123.45.32.11:1234' USERNAME = 'kingname' PASSWORD = 'kingnameisgod' TOPIC = 'howtousekafka'
本文演示所用的Kafka由我司平台組的同事搭建,需要賬號密碼才能連接,所以我在配置文件中加上了USERNAME和PASSWORD兩項。你使用的Kafka如果沒有賬號和密碼,那么你只需要SERVER和TOPIC即可。
創建生產者
代碼簡單到甚至不需要解釋。首先使用KafkaProducer類連接 Kafka,獲得一個生產者對象,然后往里面寫數據。
import json import time import datetime import config from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=config.SERVER, value_serializer=lambda m: json.dumps(m).encode()) for i in range(100): data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} producer.send(config.TOPIC, data) time.sleep(1)
參數bootstrap_servers用於指定 Kafka 的服務器連接地址。
參數value_serializer用來指定序列化的方式。這里我使用 json 來序列化數據,從而實現我向 Kafka 傳入一個字典,Kafka 自動把它轉成 JSON 字符串的效果。
注意,上圖中,我多寫了4個參數:
security_protocol="SASL_PLAINTEXT" sasl_mechanism="PLAIN" sasl_plain_username=config.USERNAME sasl_plain_password=config.PASSWORD
這四個參數是因為我這里需要通過密碼連接 Kafka 而加上的,如果你的 Kafka 沒有賬號密碼,就不需要這四個參數。
創建消費者
Kafka 消費者也需要連接 Kafka,首先使用KafkaConsumer類初始化一個消費者對象,然后循環讀取數據。代碼如下:
import config from kafka import KafkaConsumer consumer = KafkaConsumer(config.TOPIC, bootstrap_servers=config.SERVER, group_id='test', auto_offset_reset='earliest') for msg in consumer: print(msg.value)
KafkaConsumer 的第一個參數用於指定 Topic。你可以把這個 Topic 理解成 Redis 的 Key。
bootstrap_servers用於指定 Kafka 服務器連接地址。
group_id這個參數后面的字符串可以任意填寫。如果兩個程序的Topic與group_id相同,那么他們讀取的數據不會重復,兩個程序的Topic相同,但是group_id不同,那么他們各自消費全部數據,互不影響。
auto_offset_rest 這個參數有兩個值,earliest和latest,如果省略這個參數,那么默認就是latest。這個參數會單獨介紹。這里先略過。
連接好 Kafka 以后,直接對消費者對象使用 for 循環迭代,就能持續不斷獲取里面的數據了。
運行演示
運行兩個消費者程序和一個生產者程序,效果如下圖所示。

我們可以看到,兩個消費者程序讀取數據不重復,不遺漏。
當所有數據都消費完成以后,如果你把兩個消費者程序關閉,再運行其中一個,你會發現已經沒有數據會被打印出來了。
但如果你修改一下 group_id,程序又能正常從頭開始消費了,如下圖所示
很多人都會搞混的幾個地方
earliest 與 latest
auto_offset_reset='earliest'。有人看到
earliest與
latest,想當然地認為設置為
earliest,就是從 Topic 的頭往后讀,設置為
latest就是忽略之前的數據,從程序運行以后,新來的數據開始讀。
這種看法是不正確的。
auto_offset_reset這個參數,只有在一個group第一次運行的時候才有作用,從第二次運行開始,這個參數就失效了。
test2。
auto_offset_reset設置為
earliest。那么當你的消費者運行的時候,Kafka 會先把你的
offset 設置為0,然后讓你從頭開始消費的。
test3。
auto_offset_reset設置為
latest。那么當你的消費者運行的時候,Kafka 不會給你返回任何數據,消費者看起來就像卡住了一樣,但是
Kafka 會直接強制把前100條數據的狀態設置為已經被你消費的狀態。所以當前你的 offset 就直接是99了。直到生產者插入了一條新的數據,此時消費者才能讀取到。這條新的數據對應的 offset 就變成了100。
假設現在你的 Topic 里面有100個數據,你設置了一個全新的 group_id 為test4。auto_offset_reset設置為 earliest。那么當你的消費者運行的時候,Kafka 會先把你的 offset 設置為0,然后讓你從頭開始消費的。等消費到第50條數據時,你把消費者程序關了,把auto_offset_reset設置為latest,再重新運行。此時消費者依然會接着從第51條數據開始讀取。不會跳過剩下的50條數據。
所以,auto_offset_reset的作用,是在你的 group 第一次運行,還沒有 offset 的時候,給你設定初始的 offset。而一旦你這個 group 已經有 offset 了,那么auto_offset_reset這個參數就不會再起作用了。
partition 是如何分配的?
對於同一個 Topic 的同一個 Group:
假設你的 Topic 有10個 Partition,一開始你只啟動了1個消費者。那么這個消費者會輪換着從這10個Partition 中讀取數據。
當你啟動第二個消費者時,Kafka 會從第一個消費者手上搶走5個Partition,分給第二個消費者。於是兩個消費者各自讀5個 Partition。互不影響。
當第三個消費者又出現時,Kafka 從第一個消費者手上再搶走1個 Partition,從第二個消費者手上搶走2個 Partition 給第三個消費者。於是,消費者1有4個 Partition,消費者2有3個 Partition,消費者3有3個 Partiton,互不影響。
當你有10個消費者一起消費時,每個消費者讀取一個 Partition,互不影響。
當第11個消費者出現時,它由於分配不到 Partition,所以它什么都讀不到。
所以在上一篇文章中,我說,在同一個 Topic,同一個 Group 中,你有多少個 Partiton,就能起多少個進程同時消費。
