其實很早就是用kafka了,但是一直都用的非常簡單,最近寫一個小功能的時候又要用到kafka,於是就花點時間好好看了一下網上關於kafka的一些文檔和博客,發現了一個很不錯的博客,做個記錄和分享。
原文鏈接: https://www.cnblogs.com/rexcheny/articles/9463979.html
作者好像是阿里員工,他在這一篇博客中對於一個常用的參數都做了詳細的解釋,並寫了一個類可以直接使用,非常感謝。
同步發表於個人站點:http://panzhixiang.cn
單線程生產者
說是單線程,其實並不是,你啟動一個生產者其實是2個線程,后台有一個IO線程用於真正發送消息出去,前台有一個線程用於把消息發送到本地緩沖區。
#!/usr/bin/env python
# Author: rex.cheny
# E-mail: rex.cheny@outlook.com
import time
import random
import sys
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
import json
"""
KafkaProducer是發布消息到Kafka集群的客戶端,它是線程安全的並且共享單一生產者實例。生產者包含一個帶有緩沖區的池,
用於保存還沒有傳送到Kafka集群的消息記錄以及一個后台IO線程,該線程將這些留在緩沖區的消息記錄發送到Kafka集群中。
"""
"""
KafkaProducer構造函數參數解釋
- acks 0表示發送不理睬發送是否成功;1表示需要等待leader成功寫入日志才返回;all表示所有副本都寫入日志才返回
- buffer_memory 默認33554432也就是32M,該參數用於設置producer用於緩存消息的緩沖區大小,如果采用異步發送消息,那么
生產者啟動后會創建一個內存緩沖區用於存放待發送的消息,然后由專屬線程來把放在緩沖區的消息進行真正發送,
如果要給生產者要給很多分區發消息那么就需要考慮這個參數的大小防止過小降低吞吐量
- compression_type 是否啟用壓縮,默認是none,可選類型為gzip、lz4、snappy三種。壓縮會降低網絡IO但是會增加生產者端的CPU
消耗。另外如果broker端的壓縮設置和生產者不同那么也會給broker帶來重新解壓縮和重新壓縮的CPU負擔。
- retries 重試次數,當消息發送失敗后會嘗試幾次重發。默認為0,一般考慮到網絡抖動或者分區的leader切換,而不是服務端
真的故障所以可以設置重試3次。
- retry_backoff_ms 每次重試間隔多少毫秒,默認100毫秒。
- max_in_flight_requests_per_connection 生產者會將多個發送請求緩存在內存中,默認是5個,如果你開啟了重試,也就是設置了
retries參數,那么將可能導致針對於同一分區的消息出現順序錯亂。為了防止這種情況
需要把該參數設置為1,來保障同分區的消息順序。
- batch_size 對於調優生產者吞吐量和延遲性能指標有重要的作用。buffer_memeory可以看做池子,而這個batch_size可以看做池子里
裝有消息的小盒子。這個值默認16384也就是16K,其實不大。生產者會把發往同一個分區的消息放在一個batch中,當batch
滿了就會發送里面的消息,但是也不一定非要等到滿了才會發。這個數值大那么生產者吞吐量高但是性能低因為盒子太大占用內存
發送的時候這個數據量也就大。如果你設置成1M,那么顯然生產者的吞吐量要比16K高的多。
- linger_ms 上面說batch沒有填滿也可以發送,那顯然有一個時間控制,就是這個參數,默認是0毫秒,這個參數就是用於控制消息發送延遲
多久的。默認是立即發送,無需關系batch是否填滿。大多數場景我們希望立即發送,但是這也降低了吞吐量。
- max_request_size 最大請求大小,可以理解為一條消息記錄的最大大小,默認是1048576字節。
- request_timeout_ms 生產者發送消息后,broker需要在規定時間內將處理結果返回給生產者,那個這個時間長度就是這個參數
控制的,默認30000,也就是30秒。如果broker在30秒內沒有給生產者響應,那么生產者就會認為請求超時,並在回調函數
中進行特殊處理,或者進行重試。
"""
class Producer(object):
def __init__(self, KafkaServerList=['127.0.0.1:9092'], ClientId="Procucer01", Topic='Test'):
self._kwargs = {
"bootstrap_servers": KafkaServerList,
"client_id": ClientId,
"acks": 1,
"buffer_memory": 33554432,
'compression_type': None,
"retries": 3,
"batch_size": 1048576,
"linger_ms": 100,
"key_serializer": lambda m: json.dumps(m).encode('utf-8'),
"value_serializer": lambda m: json.dumps(m).encode('utf-8'),
}
self._topic = Topic
try:
self._producer = KafkaProducer(**self._kwargs)
except Exception as err:
print(err)
def _onSendSucess(self, record_metadata):
"""
異步發送成功回調函數,也就是真正發送到kafka集群且成功才會執行。發送到緩沖區不會執行回調方法。
:param record_metadata:
:return:
"""
print("發送成功")
print("被發往的主題:", record_metadata.topic)
print("被發往的分區:", record_metadata.partition)
print("隊列位置:", record_metadata.offset) # 這個偏移量是相對偏移量,也就是相對起止位置,也就是隊列偏移量。
def _onSendFailed(self):
print("發送失敗")
def sendMessage(self, value=None, partition=None):
if not value:
return None
# 發送的消息必須是序列化后的,或者是字節
# message = json.dumps(msg, encoding='utf-8', ensure_ascii=False)
kwargs = {
"value": value, # value 必須必須為字節或者被序列化為字節,由於之前我們初始化時已經通過value_serializer來做了,所以我上面的語句就注釋了
"key": None, # 與value對應的鍵,可選,也就是把一個鍵關聯到這個消息上,KEY相同就會把消息發送到同一分區上,所以如果有這個要求就可以設置KEY,也需要序列化
"partition": partition # 發送到哪個分區,整型。如果不指定將會自動分配。
}
try:
# 異步發送,發送到緩沖區,同時注冊兩個回調函數,一個是發送成功的回調,一個是發送失敗的回調。
# send函數是有返回值的是RecordMetadata,也就是記錄的元數據,包括主題、分區、偏移量
future = self._producer.send(self._topic, **kwargs).add_callback(self._onSendSucess).add_errback(self._onSendFailed)
print("發送消息:", value)
# 注冊回調也可以這樣寫,上面的寫法就是為了簡化
# future.add_callback(self._onSendSucess)
# future.add_errback(self._onSendFailed)
except KafkaTimeoutError as err:
print(err)
except Exception as err:
print(err)
def closeConnection(self, timeout=None):
# 關閉生產者,可以指定超時時間,也就是等待關閉成功最多等待多久。
self._producer.close(timeout=timeout)
def sendNow(self, timeout=None):
# 調用flush()函數可以放所有在緩沖區的消息記錄立即發送,即使ligner_ms值大於0.
# 這時候后台發送消息線程就會開始立即發送消息並且阻塞在這里,等待消息發送成功,當然是否阻塞取決於acks的值。
# 如果不調用flush函數,那么什么時候發送消息取決於ligner_ms或者batch任意一個條件滿足就會發送。
try:
self._producer.flush(timeout=timeout)
except KafkaTimeoutError as err:
print(err)
except Exception as err:
print(err)
def main():
p = Producer(KafkaServerList=["172.16.42.156:9092"], ClientId="Procucer01", Topic="TESTTOPIC")
for i in range(10):
time.sleep(1)
closePrice = random.randint(1, 500)
msg = {
"Publisher": "Procucer01",
"股票代碼": 60000 + i
"昨日收盤價": closePrice,
"今日開盤價": 0,
"今日收盤價": 0,
}
p.sendMessage(value=msg)
# p.sendNow()
p.closeConnection()
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
單線程消費者(手動拉取消息)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from kafka import KafkaConsumer
import json
class Consumer(object):
def __init__(self, KafkaServerList=['172.16.48.171:9092'], GroupID='TestGroup', ClientId="Test", Topics=['Test',]):
"""
用於設置消費者配置信息,這些配置項可以從源碼中找到,下面為必要參數。
:param KafkaServerList: kafka服務器IP:PORT 列表
:param GroupID: 消費者組ID
:param ClientId: 消費者名稱
:param Topic: 主題
"""
"""
初始化一個消費者實例,消費者不是線程安全的,所以建議一個線程實現一個消費者,而不是一個消費者讓多個線程共享
下面這些是可選參數,可以在初始化KafkaConsumer實例的時候傳遞進去
enable_auto_commit 是否自動提交,默認是true
auto_commit_interval_ms 自動提交間隔毫秒數
auto_offset_reset="earliest" 重置偏移量,earliest移到最早的可用消息,latest最新的消息,默認為latest
"""
self._kwargs = {
"bootstrap_servers": KafkaServerList,
"client_id": ClientId,
"group_id": GroupID,
"enable_auto_commit": False,
"auto_offset_reset": "latest",
"key_deserializer": lambda m: json.loads(m.decode('utf-8')),
"value_deserializer": lambda m: json.loads(m.decode('utf-8')),
}
try:
self._consumer = KafkaConsumer(**self._kwargs)
self._consumer.subscribe(topics=(Topics))
except Exception as err:
print("Consumer init failed, %s" % err)
def consumeMsg(self):
try:
while True:
data = self._consumer.poll(timeout_ms=5, max_records=100) # 拉取消息,字典類型
if data:
for key in data:
consumerrecord = data.get(key)[0] # 返回的是ConsumerRecord對象,可以通過字典的形式獲取內容。
if consumerrecord != None:
# 消息消費邏輯
message = {
"Topic": consumerrecord.topic,
"Partition": consumerrecord.partition,
"Offset": consumerrecord.offset,
"Key": consumerrecord.key,
"Value": consumerrecord.value
}
print(message)
# 消費邏輯執行完畢后在提交偏移量
self._consumer.commit()
else:
print("%s consumerrecord is None." % key)
except Exception as err:
print(err)
def main():
try:
c = Consumer(KafkaServerList=['192.168.51.193:9092'], Topics=['EEE888'])
c.consumeMsg()
except Exception as err:
print(err)
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
非手動拉取消息
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from kafka import KafkaConsumer
import json
class Consumer(object):
def __init__(self, KafkaServerList=['172.16.48.171:9092'], GroupID='TestGroup', ClientId="Test", Topics=['Test',]):
"""
用於設置消費者配置信息,這些配置項可以從源碼中找到,下面為必要參數。
:param KafkaServerList: kafka服務器IP:PORT 列表
:param GroupID: 消費者組ID
:param ClientId: 消費者名稱
:param Topic: 主題
"""
"""
初始化一個消費者實例,消費者不是線程安全的,所以建議一個線程實現一個消費者,而不是一個消費者讓多個線程共享
下面這些是可選參數,可以在初始化KafkaConsumer實例的時候傳遞進去
enable_auto_commit 是否自動提交,默認是true
auto_commit_interval_ms 自動提交間隔毫秒數
auto_offset_reset="earliest" 重置偏移量,earliest移到最早的可用消息,latest最新的消息,默認為latest
"""
self._kwargs = {
"bootstrap_servers": KafkaServerList,
"client_id": ClientId,
"group_id": GroupID,
"enable_auto_commit": False,
"auto_offset_reset": "latest",
"key_deserializer": lambda m: json.loads(m.decode('utf-8')),
"value_deserializer": lambda m: json.loads(m.decode('utf-8')),
}
try:
self._consumer = KafkaConsumer(**self._kwargs)
self._consumer.subscribe(topics=(Topics))
except Exception as err:
print("Consumer init failed, %s" % err)
def consumeMsg(self):
try:
while True:
for consumerrecord in self._consumer:
if consumerrecord:
message = {
"Topic": consumerrecord.topic,
"Partition": consumerrecord.partition,
"Offset": consumerrecord.offset,
"Key": consumerrecord.key,
"Value": consumerrecord.value
}
print(message)
self._consumer.commit()
except Exception as err:
print(err)
def main():
try:
c = Consumer(KafkaServerList=['192.168.51.193:9092'], Topics=['EEE888'])
c.consumeMsg()
except Exception as err:
print(err)
if __name__ == "__main__":
try:
main()
finally:
sys.exit()
Python API的幫助文檔:https://kafka-python.readthedocs.io/en/master/usage.html