Python 使用python-kafka類庫開發kafka生產者&消費者&客戶端


使用python-kafka類庫開發kafka生產者&消費者&客戶端

  By: 授客 QQ:1033553122

 

 

 

1.測試環境

python 3.4

 

zookeeper-3.4.13.tar.gz

下載地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下載地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下載地址1:

http://kafka.apache.org/downloads.html

下載地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pip-18.1.tar.gz

下載地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

說明:實踐中發現,pip版本比較舊的話,沒法安裝whl文件

 

kafka_python-1.4.4-py2.py3-none-any.whl

下載地址1:

https://pypi.org/project/kafka-python/#files

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

 

下載地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

下載地址1:

https://www.lfd.uci.edu/~gohlke/pythonlibs/

 

下載地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

說明:

kafka-python支持gzip壓縮/解壓縮。如果要消費lz4方式壓縮的消息,則需要安裝python-lz4,如果要支持snappy方式壓縮/解壓縮則需要安裝,否則可能會報錯:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

構建生產者對象時,可通過compression_type 參數指定由對應生產者生產的消息數據的壓縮方式,或者在producer.properties配置中配置compression.type參數。

 

參考鏈接:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

 

2.代碼實踐

生產者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

from kafka import KafkaProducer

import json

 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

 

 

for i in range(0, 100):

    producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

 

# Block直到單條消息發送完或者超時

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

 

# Block直到所有阻塞的消息發送到網絡

# 注意: 該操作不保證傳輸或者消息發送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms

 

 

# 序列化json數據

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

 

# 序列化字符串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

 

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

    producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

 

# 消息記錄攜帶header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

 

# 獲取性能數據(注意,實踐發現分區較多的情況下,該操作比較耗時

metrics = producer.metrics()

print(metrics)

 

producer.flush()

 

實踐中遇到錯誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:

進入到配置目錄(config),編輯server.properties文件,

查找並設置listener,配置監聽端口,格式:listeners = listener_name://host_name:port,供kafka客戶端連接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

 

 

API及常用參數說明:

class kafka.KafkaProducer(**configs)

bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台服務器)地址,默認值為 localhost, port默認值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

 

key_serializer (可調用對象) –用於轉換用戶提供的key值為字節,必須返回字節數據。 如果為None,則等同調用f(key)。 默認值: None.

 

value_serializer(可調用對象) – 用於轉換用戶提供的value消息值為字節,必須返回字節數據。 如果為None,則等同調用f(value)。 默認值: None.

 

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic(str) – 設置消息將要發布到的主題,即消息所屬主題

 

value(可選) – 消息內容,必須為字節數據,或者通過value_serializer序列化后的字節數據。如果為None,則key必填,消息等同於“刪除”。( If value is None, key is required and message acts as a ‘delete’)

 

partition (int, 可選) – 指定分區。如果未設置,則使用配置的partitioner

 

key (可選) – 和消息對應的key,可用於決定消息發送到哪個分區。如果平partition為None,則相同key的消息會被發布到相同分區(但是如果key為None,則隨機選取分區)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為字節數據或者通過配置的key_serializer序列化后的字節數據.

 

headers (可選) – 設置消息header,header-value鍵值對表示的list。list項為元組:格式 (str_header,bytes_value)

 

timestamp_ms (int, 可選) –毫秒數 (從1970 1月1日 UTC算起) ,作為消息時間戳。默認為當前時間

 

函數返回FutureRecordMetadata類型的RecordMetadata數據

 

flush(timeout=None)

發送所有可以立即獲取的緩沖消息(即時linger_ms大於0),線程block直到這些記錄發送完成。當一個線程等待flush調用完成而block時,其它線程可以繼續發送消息。

 

注意:flush調用不保證記錄發送成功

 

metrics(raw=False)

獲取生產者性能指標。

 

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

 

注:生產者代碼是線程安全的,支持多線程,而消費者則不然

 

消費者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

 


consumer = KafkaConsumer('MY_TOPIC1',
                         bootstrap_servers=['127.0.0.1:9092'],
                         #auto_offset_reset='',
                         auto_offset_reset='latest',# 消費kafka中最近的數據,如果設置為earliest則消費最早的數據,不管這些數據是否消費
                         enable_auto_commit=True, # 自動提交消費者的offset
                         auto_commit_interval_ms=3000, ## 自動提交消費者offset的時間間隔
                         group_id='MY_GROUP1',

                         consumer_timeout_ms= 10000, # 如果10秒內kafka中沒有可供消費的數據,自動退出
                         client_id='consumer-python3'
                         )

 

for msg in consumer:

    print (msg)

    print('topic: ', msg.topic)

    print('partition: ', msg.partition)

    print('key: ', msg.key, 'value: ', msg.value)

    print('offset:', msg.offset)

    print('headers:', msg.headers)

 

# Get consumer metrics

metrics = consumer.metrics()

print(metrics)

 

運行效果

 

 

 

通過assign、subscribe兩者之一為消費者設置消費的主題

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

                         auto_offset_reset='latest',

                         enable_auto_commit=True, # 自動提交消費數據的offset

                         consumer_timeout_ms= 10000, # 如果1秒內kafka中沒有可供消費的數據,自動退出

                         value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費json 格式的消息

                         client_id='consumer-python3'

                         )

 

 

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next(consumer)

# print(msg)

 

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

    print (msg)

 

 

API及常用參數說明:

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可選,設置需要訂閱的topic,如果未設置,需要在消費記錄前調用subscribe或者assign。

 

client_id (str) – 客戶端名稱,默認值: ‘kafka-python-{version}’

 

group_id (str or None) – 消費組名稱。如果為None,則通過group coordinator auto-partition分區分配,offset提交被禁用。默認為None

 

auto_offset_reset (str) – 重置offset策略: 'earliest'將移動到最老的可用消息, 'latest'將移動到最近消息。 設置為其它任何值將拋出異常。默認值:'latest'。

 

enable_auto_commit (bool) –  如果為True,將自動定時提交消費者offset。默認為True。

 

auto_commit_interval_ms (int) – 自動提交offset之間的間隔毫秒數。如果enable_auto_commit 為true,默認值為: 5000。

 

value_deserializer(可調用對象) - 攜帶原始消息value並返回反序列化后的value

 

subscribe(topics=(), pattern=None, listener=None)

訂閱需要的主題

topics (list) – 需要訂閱的主題列表

pattern (str) – 用於匹配可用主題的模式,即正則表達式。注意:必須提供topics、pattern兩者參數之一,但不能同時提供兩者。

 

metrics(raw=False)

獲取消費者性能指標。

 

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

客戶端

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka.client import KafkaClient

 

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

 

# 獲取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

    print('broker: ', broker)

    print('broker nodeId: ', broker.nodeId)

 

# 獲取主題的所有分區

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic(topic)

print(partitions)

 

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

 

 

運行結果:

broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId:  0

{0}

{'MY_TOPIC1': [0]}

 

API及常用參數說明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台服務器)地址,默認值為 localhost, port默認值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

 

client_id (str) – 客戶端名稱,默認值: ‘kafka-python-{version}’

 

request_timeout_ms (int) – 客戶端請求超時時間,單位毫秒。默認值: 30000.

 

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

 

brokers()

獲取所有broker元數據

 

available_partitions_for_topic(topic)

返回主題的所有分區

 

 

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM