1. kafka-python的安裝
pip3 install kafka-python
2.kafka-python的基本使用
- 最簡單使用實例
1.消費端
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8')) for msg in consumer: print(msg)
- 第1個參數為 topic的名稱
- group_id : 指定此消費者實例屬於的組名,可以不指定
- bootstrap_servers : 指定kafka服務器
2.生產端
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092']) future = producer.send('my_topic' , key= b'my_key', value= b'my_value', partition= 0) result = future.get(timeout= 10) print(result)
producer.send函數為發送消息
- 第1個參數為 topic名稱,必須指定
- key : 鍵,必須是字節字符串,可以不指定(但key和value必須指定1個),默認為None
- value : 值,必須是字節字符串,可以不指定(但key和value必須指定1個),默認為None
- partition : 指定發送的partition,由於kafka默認配置1個partition,固為0
future.get函數等待單條消息發送完成或超時,經測試,必須有這個函數,不然發送不出去,或用time.sleep代替
3.發送或接收消息解析
消費者端接收消息如下:
ConsumerRecord(topic='my_topic', partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=b'my_value', value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
- topic
- partition
- offset : 這條消息的偏移量
- timestamp : 時間戳
- timestamp_type : 時間戳類型
- key : key值,字節類型
- value : value值,字節類型
- checksum : 消息的校驗和
- serialized_key_size : 序列化key的大小
- serialized_value_size : 序列化value的大小,可以看到value=None時,大小為-1
KafkaConsumer
- 手動分配partition
from kafka import KafkaConsumer from kafka import TopicPartition consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092']) consumer.assign([TopicPartition(topic= 'my_topic', partition= 0)]) for msg in consumer: print(msg)
- 超時處理
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['localhost:9092'], consumer_timeout_ms=1000) for msg in consumer: print(msg)
若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待
consumer_timeout_ms : 毫秒數
- 訂閱多個topic
from kafka import KafkaConsumer consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092']) consumer.subscribe(topics= ['my_topic', 'topic_1']) for msg in consumer: print(msg)
可同時接收多個topic消息
也可用正則訂閱一類topic
from kafka import KafkaConsumer import json consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii'))) consumer.subscribe(pattern= '^my.*') for msg in consumer: print(msg)
- 解碼json數據
編碼(生產者):value_serializer
解碼(消費者):value_deserializer
1.先看producer發送的json數據
from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii')) future = producer.send('my_topic' , value= {'value_1' : 'value_2'}, partition= 0) future.get(timeout= 10)
2.consumer沒有解碼收到的數據
ConsumerRecord(topic='my_topic', partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b'{"value_1": "value_2"}', checksum=None, serialized_key_size=-1, serialized_value_size=22)
可以看到value為原始的json字節數據,接下來可以再做一步解碼操作
3.consumer自動解碼
from kafka import KafkaConsumer import json consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii'))) consumer.subscribe(topics= ['my_topic', 'topic_1']) for msg in consumer: print(msg)
接收結果:
ConsumerRecord(topic='my_topic', partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={'value_1': 'value_2'}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
- 可以看到接收結果中,value已經自動解碼,並為字符串類型
- 不僅value可以json,key也可以,只需指定 key_deserializer
KafkaProducer
- 發送字符串類型的key和value
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer= str.encode, value_serializer= str.encode) future = producer.send('my_topic' , key= 'key_3', value= 'value_3', partition= 0) future.get(timeout= 10)
指定 key_serializer 和 value_serializer 為 str.encode,但消費者收到的還是字節字符串
若想要消費者收到的為字符串類型,就需要解碼操作,key_deserializer= bytes.decode
from kafka import KafkaConsumer consumer = KafkaConsumer(group_id= 'group2', bootstrap_servers= ['localhost:9092'], key_deserializer= bytes.decode, value_deserializer= bytes.decode) consumer.subscribe(pattern= '^my.*') for msg in consumer: print(msg)
- 可壓縮消息發送
compression_type='gzip'
若消息過大,還可壓縮消息發送,可選值為 ‘gzip’, ‘snappy’, ‘lz4’, or None
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], compression_type='gzip') future = producer.send('my_topic' , key= b'key_3', value= b'value_3', partition= 0) future.get(timeout= 10)
- 發送msgpack消息
msgpack為MessagePack的簡稱,是高效二進制序列化類庫,比json高效
producer = KafkaProducer(value_serializer=msgpack.dumps) producer.send('msgpack-topic', {'key': 'value'})
end