kafka-python基本使用


1. kafka-python的安裝

  pip3 install kafka-python

2.kafka-python的基本使用

  • 最簡單使用實例

1.消費端

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for msg in consumer:
    print(msg)
  • 第1個參數為 topic的名稱
  • group_id : 指定此消費者實例屬於的組名,可以不指定
  • bootstrap_servers : 指定kafka服務器

2.生產端

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0)
result = future.get(timeout= 10)
print(result)

producer.send函數為發送消息

  • 第1個參數為 topic名稱,必須指定
  • key : 鍵,必須是字節字符串,可以不指定(但key和value必須指定1個),默認為None
  • value : 值,必須是字節字符串,可以不指定(但key和value必須指定1個),默認為None
  • partition : 指定發送的partition,由於kafka默認配置1個partition,固為0

future.get函數等待單條消息發送完成或超時,經測試,必須有這個函數,不然發送不出去,或用time.sleep代替

 

3.發送或接收消息解析

消費者端接收消息如下:

ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
  • topic
  • partition
  • offset : 這條消息的偏移量
  • timestamp : 時間戳
  • timestamp_type : 時間戳類型
  • key : key值,字節類型
  • value : value值,字節類型
  • checksum : 消息的校驗和
  • serialized_key_size : 序列化key的大小
  • serialized_value_size : 序列化value的大小,可以看到value=None時,大小為-1

 

KafkaConsumer

    • 手動分配partition
from kafka import KafkaConsumer
from kafka import TopicPartition

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)])
for msg in consumer:
    print(msg)
    • 超時處理

 

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000)
for msg in consumer:
    print(msg)

若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待

consumer_timeout_ms : 毫秒數

 

    • 訂閱多個topic

 

from kafka import KafkaConsumer

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'])
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
    print(msg)

可同時接收多個topic消息

也可用正則訂閱一類topic

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
    print(msg)
  • 解碼json數據

編碼(生產者):value_serializer

解碼(消費者):value_deserializer

1.先看producer發送的json數據

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
future = producer.send('my_topic' ,  value= {'value_1' : 'value_2'}, partition= 0)
future.get(timeout= 10)

2.consumer沒有解碼收到的數據

ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)

可以看到value為原始的json字節數據,接下來可以再做一步解碼操作

3.consumer自動解碼

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(topics= ['my_topic', 'topic_1'])
for msg in consumer:
    print(msg)

接收結果:

ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
  • 可以看到接收結果中,value已經自動解碼,並為字符串類型
  • 不僅value可以json,key也可以,只需指定 key_deserializer

 

KafkaProducer

    • 發送字符串類型的key和value

 

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode)
future = producer.send('my_topic' ,  key= 'key_3', value= 'value_3', partition= 0)
future.get(timeout= 10)

指定 key_serializer 和 value_serializer 為 str.encode,但消費者收到的還是字節字符串

若想要消費者收到的為字符串類型,就需要解碼操作,key_deserializer= bytes.decode

 
from kafka import KafkaConsumer

consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode)
consumer.subscribe(pattern= '^my.*')
for msg in consumer:
    print(msg)
  • 可壓縮消息發送

compression_type='gzip'

若消息過大,還可壓縮消息發送,可選值為 ‘gzip’, ‘snappy’, ‘lz4’, or None

 
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip')
future = producer.send('my_topic' ,  key= b'key_3', value= b'value_3', partition= 0)
future.get(timeout= 10)
  • 發送msgpack消息

msgpack為MessagePack的簡稱,是高效二進制序列化類庫,比json高效

producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})

 

end

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM