kafka-python的API簡單介紹


在上一篇文章中說明了kafka-python的API使用的理論概念,這篇文章來說明API的實際使用。

在官方文檔詳細列出了kafka-python的API接口https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

對於生成者我們着重於介紹一個send方法,其余的方法提到的時候會說明,在官方文檔中有許多可配置參數可以查看,也可以查看上一篇博文中的參數。

#send方法的詳細說明,send用於向主題發送信息
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic (str) – topic where the message will be published,指定向哪個主題發送消息。
value (optional) – message value. Must be type bytes, or be serializable to bytes via configured value_serializer. If value is None, key is required and message acts as a ‘delete’.
#value為要發送的消息值,必須為bytes類型,如果這個值為空,則必須有對應的key值,並且空值被標記為刪除。可以通過配置value_serializer參數序列化為字節類型。
key (optional) – a key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer’s partitioner config is left as default),
then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be
type bytes, or be serializable to bytes via configured key_serializer.
         #key與value對應的鍵值,必須為bytes類型。kafka根據key值確定消息發往哪個分區(如果分區被指定則發往指定的分區),具有相同key的消息被發往同一個分區,如果key
#為NONE則隨機選擇分區,可以使用key_serializer參數序列化為字節類型。
headers (optional) – a list of header key value pairs. List items are tuples of str key and bytes value.
#鍵值對的列表頭部,列表項是str(key)和bytes(value)。
timestamp_ms (int, optional) – epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.
#時間戳

消息發送成功,返回的是RecordMetadata的對象;否則的話引發KafkaTimeoutError異常

在進行實際測試前,先創建一個topics,這里我們利用控制台創建:

[root@test3 bin]# ./kafka-topics.sh --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --create --topic kafkatest --replication-factor 3 --partitions 3
Created topic "kafkatest".
[root@test3 bin]# ./kafka-topics.sh --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --list --topic kafkatest
kafkatest
[root@test3 bin]# ./kafka-topics.sh --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --describe --topic kafkatest
Topic:kafkatest    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: kafkatest    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
    Topic: kafkatest    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: kafkatest    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
[root@test3 bin]#  

#主題有3個分區,3個復制系數,主題名為kafkatest.

 

一個簡易的生產者demo如下:(摘自:https://blog.csdn.net/luanpeng825485697/article/details/81036028

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=["10.0.102.214:9092"])

i = 20
while True:
    i += 1
    msg = "producer1+%d" % i
    print(msg)
    producer.send('kafkatest', key=bytes(str(i), value=msg.encode('utf-8'))
    time.sleep(1)

producer.close()

#就是一個簡易的while循環,不停的向kafka發送消息,一定要注意send發送的key和value的值均為bytes類型。

一個消費者的demo接收上面生產者發送的數據。

from kafka import KafkaConsumer

consumer = KafkaConsumer("kafkatest", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='latest')
for msg in consumer:
    key = msg.key.decode(encoding="utf-8")               #因為接收到的數據時bytes類型,因此需要解碼
    value = msg.value.decode(encoding="utf-8")
    print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))

#這是一個阻塞的過程,當生產者有消息傳來的時候,就會讀取消息,若是沒有消息就會阻塞等待
#auto_offset_reset參數表示重置偏移量,有兩個取值,latest表示讀取消息隊列中最新的消息,另一個取值earliest表示讀取最早的消息。

執行上面的兩個demo,得到的結果如下:

消費者群組

在上一篇博文中,說明了消費者群組與消費者的概念,這里我們來定義一個消費者群組。

一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。每個消費者接收主題一部分分區的消息

創建一個消費者群組如下:

from kafka import KafkaConsumer
import time

#消費者群組中有一個group_id參數, consumer
= KafkaConsumer("kafkatest", group_id="test1", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='latest') for msg in consumer: key = msg.key.decode(encoding="utf-8") value = msg.value.decode(encoding="utf-8") print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))

消費者群組中的消費者總是消費訂閱主題的部分數據。

在pycharm中把上面的代碼復制一份,這樣在一個test1群組中就有了兩個消費者,同時執行。

分析: kafkatest主題有3個分區,3個分區會被分配給test1群組中的兩個消費者,在上面一篇博文中提到,默認的分配策略時range。也就是說一個消費者可能由2個分區,另一個消費者只有一個分區;執行結果如下:

下面會通過實例來說明幾個消費者的方法的使用

 kafka-python的API官方文檔介紹的很清楚,可以查看:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer("kafkatest", group_id="test1", bootstrap_servers=["10.0.102.204:9092"])
>>> consumer.topics() #獲取主主題列表,返回的是一個set集合 {'kafkatest', 'lianxi', 'science'} >>> consumer.partitions_for_topic("kafkatest") #獲取主題的分區信息 {0, 1, 2} >>> consumer.subscription() #獲取當前消費者訂閱的主題 {'kafkatest'}

>>> consumer.position((0,))      #得到下一個記錄的偏移量

TypeError: partition must be a TopicPartition namedtuple

#需要注意的是position方法需要傳入的是一個kafka-python自帶的一種數據結構TopicPartition,這種數據結構的定義如下,在使用的時候需要導入
TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])

>>> consumer.position(TopicPartition(topic='kafkatest', partition=1))
17580

下面說明poll()方法的用法:

poll(timeout_ms=0, max_records=None)方法: 從指定的主題/分區中獲取數據
Records are fetched and returned in batches by topic-partition. On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek() or automatically set as the last committed offset for the subscribed list of partitions.
#通過主題-分區分批獲取和返回記錄,在每一個輪詢中,消費者將會使用最后消費的偏移量作為開始然后順序fetch數據。最后消費的偏移量可以使用seek()手動設置,或者自動設置為訂閱
#的分區列表的最后提交的偏移量。 Incompatible with iterator interface – use one or the other, not both. 與迭代器的接口是對立的。 timeout_ms (
int, optional) – Milliseconds spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any
                records that are available currently in the buffer, else returns empty. Must not be negative. Default: 0 max_records (int, optional) – The maximum number of records returned in a single call to poll(). Default: Inherit value from max_poll_records.
默認從max_poll_records繼承值。
#一個簡答的實例從kafka拉取數據
from kafka import KafkaConsumer import
time consumer = KafkaConsumer("kafkatest", bootstrap_servers=['10.0.102.204:9092']) while True: msg = consumer.poll(timeout_ms=5) print(msg) time.sleep(2)


#執行結果如下,返回的是一個字典,consumerRecord對象包含着消息的一些元數據信息
{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21929, timestamp=1545978879892, timestamp_type=0, key=b'138', value=b'producer1+138', checksum=-660348132, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22064, timestamp=1545978882893, timestamp_type=0, key=b'141', value=b'producer1+141', checksum=-1803506349, serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21930, timestamp=1545978880892, timestamp_type=0, key=b'139', value=b'producer1+139', checksum=-1863433503, serialized_key_size=3, serialized_value_size=13), ConsumerRecord(topic='kafkatest', partition=2, offset=21931, timestamp=1545978881893, timestamp_type=0, key=b'140', value=b'producer1+140', checksum=-280146643, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21932, timestamp=1545978884894, timestamp_type=0, key=b'143', value=b'producer1+143', checksum=1459018748, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=1): [ConsumerRecord(topic='kafkatest', partition=1, offset=22046, timestamp=1545978883894, timestamp_type=0, key=b'142', value=b'producer1+142', checksum=-2023137030, serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22065, timestamp=1545978885894, timestamp_type=0, key=b'144', value=b'producer1+144', checksum=1999922748, serialized_key_size=3, serialized_value_size=13)]}

seek()方法的用法:

seek(partition, offset)    
    
Manually specify the fetch offset for a TopicPartition.
#手動指定拉取主題的偏移量

Overrides the fetch offsets that the consumer will use on the next poll(). If this API is invoked for the same partition more than once, 
the latest offset will be used on the next poll(). #覆蓋下一個消費者使用poll()拉取的偏移量。如果這個API對同一個分區執行了多次,那么最后一個次的結果將會被使用。 Note: You may lose data
if this API is arbitrarily used in the middle of consumption to reset the fetch offsets. #如果在消費過程中任意使用此API以重置提取偏移,則可能會丟失數據。


#實例如下
>>> consumer.position(TopicPartition(topic="kafkatest",partition=1))
22103

#使用seek()設置偏移量
>>> consumer.seek(partition=TopicPartition("kafkatest",1),offset=22222)
#需要說明的是seek函數有一個partition參數,但是這個參數必須是TopicPartition類型的。

>>> consumer.position(TopicPartition(topic="kafkatest",partition=1))
22222

與seek相關的還有兩個方法:

seek_to_beginning(*partitions)
#尋找分區最早可用的偏移量
seek_to_end(*partitions)
#尋找分區最近可用的偏移量

>>> consumer.seek_to_beginning(TopicPartition("kafkatest",1))
>>> consumer.seek_to_end(TopicPartition("kafkatest",1))

#注意這兩個方法的參數都是TopicPartition類型。

subscribe()方法,給當前消費者訂閱主題。

Subscribe to a list of topics, or a topic regex pattern.
#訂閱一個主體列表,或者主題的正則表達式
Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).
#分區將會通過分區協調器自動分配。主題訂閱不是增量的,這個列表將會替換已經存在的主題。

This method is incompatible with assign().    
#這個方法與assign()方法是不兼容的。    

#說明一下listener參數:監聽回調,該回調將在每次重新平衡操作之前和之后調用。
作為組管理的一部分,消費者將跟蹤屬於特定組的使用者列表,並在以下事件之一觸發時觸發重新平衡操作:

  任何訂閱主題的分區數都會發生變化   主題已創建或刪除   消費者組織的現有成員死亡   將新成員添加到使用者組 觸發任何這些事件時,將首先調用提供的偵聽器以指示已撤消使用者的分配,然后在收到新分配時再次調用。請注意,此偵聽器將立即覆蓋先前對subscribe的調用中設置的任何偵聽器。

但是,可以保證通過此接口撤消/分配的分區來自此呼叫中訂閱的主題。

>>> consumer.subscription()                       #當前消費者訂閱的主題
{'lianxi'}
>>> consumer.subscribe(("kafkatest","lianxi"))    #訂閱主題,會覆蓋之前的主題
>>> consumer.subscription()                       #可以看到已經覆蓋
{'lianxi', 'kafkatest'}

unsubscribe() :取消訂閱所有主題並清除所有已分配的分區。

assign(partitions)

Manually assign a list of TopicPartitions to this consumer.
#手動將TopicPartitions指定給此消費者。
#這個函數和subscribe函數不能同時使用
>>> consumer.assign(TopicPartition("kafkatest",1))

assignment():

Get the TopicPartitions currently assigned to this consumer.
如果分區是使用assign()直接分配的,那么這將只返回先前分配的相同分區。如果使用subscribe()訂閱了主題,那么這將給出當前分配給使用者的主題分區集(如果分配尚未發生,
或者分區正在重新分配的過程中,則可能是None)

beginning_offsets(partitions)

Get the first offset for the given partitions.     #得到給定分區的第一個偏移量

This method does not change the current consumer position of the partitions. #這個方法不會改變當前消費者的偏移量

This method may block indefinitely if the partition does not exist.  #這個方法可能會阻塞,如果給定的分區沒有出現。

partitions參數仍然是TopicPartition類型。
>>> consumer.beginning_offsets(TopicPartition("kafkatest",1))
#這個方法在kafka-python-1.3.1中沒有

close(autocommit=True)

Close the consumer, waiting indefinitely for any needed cleanup.   #關閉消費者,阻塞等待所需要的清理。

Keyword Arguments:
     autocommit (bool) – If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any 
pending consumed offsets prior to close. Default: True
#如果為此使用者配置了自動提交,則此可選標志會導致使用者在關閉之前嘗試提交任何待處理的消耗偏移量。默認值:True

commit(offsets=None)

Commit offsets to kafka, blocking until success or error.
#提交偏移量到kafka,阻塞直到成功或者出錯
這只向Kafka提交偏移量。使用此API提交的偏移量將在每次重新平衡之后的第一次取出時以及在啟動時使用。因此,如果需要在Kafka以外的任何地方存儲偏移,則不應該使用此API。
為了避免在重新啟動使用者時重新處理讀取的最后一條消息,提交的偏移量應該是應用程序應該使用的下一條消息,即:last_offset
+1

Parameters:    offsets (dict, optional) – {TopicPartition: OffsetAndMetadata} dict to commit with the configured group_id. Defaults to
currently consumed offsets for all subscribed partitions.

commit_async(offsets=None, callback=None)

Commit offsets to kafka asynchronously, optionally firing callback.
#異步提交,可選擇的觸發回調,其余的和上面的commit一樣。

committed(partition)

Get the last committed offset for the given partition.

This offset will be used as the position for the consumer in the event of a failure.

如果有問題的分區未分配給此使用者,或者使用者尚未初始化其已提交偏移量緩存,則此調用可能會阻止執行遠程調用。
>>> consumer.committed(TopicPartition("kafkatest",1))
22103

pase, pased和resume

pase:暫停當前正在進行的請求。需要使用resume恢復
pased:獲取使用pase暫停時的分區信息
resume: 從pase狀態恢復。
除了pased之外,其余兩個方法的參數均為TopicPartation類型

kafka-python除了有消費者和生成者之外,還有一個客戶端,下面我們來說明客戶端API。

客戶端API

客戶端API的官方文檔為: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

簡單說明怎么使用客戶端API創建主題。

>>> from kafka.client import KafkaClient
>>> kc = KafkaClient(bootstrap_servers="10.0.102.204:9092")
>>> kc.config        #配置還是蠻多的
{'bootstrap_servers': '10.0.102.204:9092', 'client_id': 'kafka-python-1.3.1', 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(6, 1, 1)], 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_password': None, 'ssl_crlfile': None, 'api_version': (0, 10), 'api_version_auto_timeout_ms': 2000, 'selector': <class 'selectors.SelectSelector'>, 'metrics': None, 'metric_group_prefix': '', 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None}
#這些參數的具體意思可以查看上面的官方文檔。

>>> kc.add_topic("clent-1") #添加主題
<kafka.future.Future object at 0x0000000003A92320>

kafka-python還提供了其余兩個API,broker連接API和集群連接API

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM