使用pykafka,kafka-python的api開發kafka生產者和消費者


來源於  https://blog.csdn.net/learn_tech/article/details/81115996

 

轉載地址:https://blog.csdn.net/ricky110/article/details/79157043

                  https://blog.csdn.net/DilemmaVF/article/details/71124060

                  https://blog.csdn.net/yanhx1204/article/details/54965012

                  https://blog.csdn.net/see_you_see_me/article/details/78468421

目錄

一、簡介

二、pykafka

(1) pykafka安裝

(2) pykafka的api

(3) pykafka生產者api

(4) pykafka消費者api

三、kafka-python

(1) kafka-python安裝

(2) kafka-python的api

(3) kafka-python生產者

(4) kafka-python消費者

一、簡介
     python連接kafka的標准庫,kafka-python和pykafka。kafka-python使用的人多是比較成熟的庫,kafka-python並沒有zk的支持。pykafka是Samsa的升級版本,使用samsa連接zookeeper,生產者直接連接kafka服務器列表,消費者才用zookeeper。使用kafka Cluster。

二、pykafka
(1) pykafka安裝
根據機器環境從以下三種方式中選擇進行一種安裝pykafka,版本號是2.7.0。

# PyPI安裝
pip install pykafka

# conda安裝
conda install -c conda-forge pykafka

# anaconda自帶pip安裝
/root/anaconda3/bin/pip install pykafka
(2) pykafka的api
     1、http://pykafka.readthedocs.io/en/latest/,https://github.com/Parsely/pykafka

     2、在pykafka安裝目錄site-packages/pykafka/下,直接查看。

(3) pykafka生產者api
#coding=utf-8

import time
from pykafka import KafkaClient


class KafkaTest(object):
"""
測試kafka常用api
"""
def __init__(self, host="192.168.237.129:9092"):
self.host = host
self.client = KafkaClient(hosts=self.host)

def producer_partition(self, topic):
"""
生產者分區查看,主要查看生產消息時offset的變化
:return:
"""
topic = self.client.topics[topic.encode()]
partitions = topic.partitions
print (u"查看所有分區 {}".format(partitions))

earliest_offset = topic.earliest_available_offsets()
print(u"獲取最早可用的offset {}".format(earliest_offset))

# 生產消息之前看看offset
last_offset = topic.latest_available_offsets()
print(u"最近可用offset {}".format(last_offset))

# 同步生產消息
p = topic.get_producer(sync=True)
p.produce(str(time.time()).encode())

# 查看offset的變化
last_offset = topic.latest_available_offsets()
print(u"最近可用offset {}".format(last_offset))

def producer_designated_partition(self, topic):
"""
往指定分區寫消息,如果要控制打印到某個分區,
需要在獲取生產者的時候指定選區函數,
並且在生產消息的時候額外指定一個key
:return:
"""

def assign_patition(pid, key):
"""
指定特定分區, 這里測試寫入第一個分區(id=0)
:param pid: 為分區列表
:param key:
:return:
"""
print("為消息分配partition {} {}".format(pid, key))
return pid[0]

topic = self.client.topics[topic.encode()]
p = topic.get_producer(sync=True, partitioner=assign_patition)
p.produce(str(time.time()).encode(), partition_key=b"partition_key_0")

def async_produce_message(self, topic):
"""
異步生產消息,消息會被推到一個隊列里面,
另外一個線程會在隊列中消息大小滿足一個閾值(min_queued_messages)
或到達一段時間(linger_ms)后統一發送,默認5s
:return:
"""
topic = self.client.topics[topic.encode()]
last_offset = topic.latest_available_offsets()
print("最近的偏移量 offset {}".format(last_offset))

# 記錄最初的偏移量
old_offset = last_offset[0].offset[0]
p = topic.get_producer(sync=False, partitioner=lambda pid, key: pid[0])
p.produce(str(time.time()).encode())
s_time = time.time()
while True:
last_offset = topic.latest_available_offsets()
print("最近可用offset {}".format(last_offset))
if last_offset[0].offset[0] != old_offset:
e_time = time.time()
print('cost time {}'.format(e_time-s_time))
break
time.sleep(1)

def get_produce_message_report(self, topic):
"""
查看異步發送消報告,默認會等待5s后才能獲得報告
"""
topic = self.client.topics[topic.encode()]
last_offset = topic.latest_available_offsets()
print("最近的偏移量 offset {}".format(last_offset))
p = topic.get_producer(sync=False, delivery_reports=True, partitioner=lambda pid, key: pid[0])
p.produce(str(time.time()).encode())
s_time = time.time()
delivery_report = p.get_delivery_report()
e_time = time.time()
print ('等待{}s, 遞交報告{}'.format(e_time-s_time, delivery_report))
last_offset = topic.latest_available_offsets()
print("最近的偏移量 offset {}".format(last_offset))


if __name__ == '__main__':
host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
kafka_ins = KafkaTest(host)
topic = 'test'
# kafka_ins.producer_partition(topic)
# kafka_ins.producer_designated_partition(topic)
# kafka_ins.async_produce_message(topic)
kafka_ins.get_produce_message_report(topic)
注意要點:

多進程使用pykafka共享一個client,會造成只有進程能夠正常的寫入數據,如果使用了dliver_report(包括同步),會導致子進程徹底阻塞掉不可用
使用producer.produce發送數據出現故障,如下
#!/bin/env python
from pykafka import KafkaClient
host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
client = KafkaClient(hosts = host)
topic = client.topics["test"]
with topic.get_sync_producer() as producer:
for i in range(100):
producer.produce('test message ' + str(i ** 2))
報錯:

Traceback (most recent call last):
File "TaxiKafkaProduce.py", line 15, in <module>
producer.produce(('test message ' + str(i ** 2)))
File "/root/anaconda3/lib/python3.6/site-packages/pykafka/producer.py", line 325, in produce
"got '%s'", type(message))
TypeError: ("Producer.produce accepts a bytes object as message, but it got '%s'", <class 'str'>)
是因為kafka傳遞的字節,因此在傳遞字符串處encode()即可,分別是client.topics和producer.produce(),如下:

#!/bin/env python
from pykafka import KafkaClient
host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
client = KafkaClient(hosts = host)
topic = client.topics["test".encode()]
# 將產生kafka同步消息,這個調用僅僅在我們已經確認消息已經發送到集群之后
with topic.get_sync_producer() as producer:
for i in range(100):
producer.produce(('test message ' + str(i ** 2)).encode())
同步與異步
from pykafka import KafkaClient
#可接受多個client
client = KafkaClient(hosts ="192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092")
#查看所有的topic
client.topics
print client.topics

topic = client.topics['test_kafka_topic']#選擇一個topic

message = "test message test message"
# 當有了topic之后呢,可以創建一個producer,來發消息,生產kafka數據,通過字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message)
# 以上的例子將產生kafka同步消息,這個調用僅僅在我們已經確認消息已經發送到集群之后

#但生產環境,為了達到高吞吐量,要采用異步的方式,通過delivery_reports =True來啟用隊列接口;
producer = topic.get_producer(sync=False, delivery_reports=True)
producer.produce(message)
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
print 'Failed to deliver msg {}: {}'.format(msg.partition_key, repr(exc))
else:
print 'Successfully delivered msg {}'.format(msg.partition_key)
except Queue.Empty:
pass
(4) pykafka消費者api
pykafka消費者分為simple和balanced兩種 

simple適用於需要消費指定分區且不需要自動的重分配(自定義)
balanced自動分配則選擇
#coding=utf-8

from pykafka import KafkaClient


class KafkaTest(object):
def __init__(self, host="192.168.237.129:9092"):
self.host = host
self.client = KafkaClient(hosts=self.host)

def simple_consumer(self, topic, offset=0):
"""
消費者指定消費
:param offset:
:return:
"""

topic = self.client.topics[topic.encode()]
partitions = topic.partitions
last_offset = topic.latest_available_offsets()
print("最近可用offset {}".format(last_offset)) # 查看所有分區
consumer = topic.get_simple_consumer(b"simple_consumer_group", partitions=[partitions[0]]) # 選擇一個分區進行消費
offset_list = consumer.held_offsets
print("當前消費者分區offset情況{}".format(offset_list)) # 消費者擁有的分區offset的情況
consumer.reset_offsets([(partitions[0], offset)]) # 設置offset
msg = consumer.consume()
print("消費 :{}".format(msg.value.decode()))
msg = consumer.consume()
print("消費 :{}".format(msg.value.decode()))
msg = consumer.consume()
print("消費 :{}".format(msg.value.decode()))
offset = consumer.held_offsets
print("當前消費者分區offset情況{}".format(offset)) # 3

def balance_consumer(self, topic, offset=0):
"""
使用balance consumer去消費kafka
:return:
"""
topic = self.client.topics["kafka_test".encode()]
# managed=True 設置后,使用新式reblance分區方法,不需要使用zk,而False是通過zk來實現reblance的需要使用zk
consumer = topic.get_balanced_consumer(b"consumer_group_balanced2", managed=True)
partitions = topic.partitions
print("分區 {}".format(partitions))
earliest_offsets = topic.earliest_available_offsets()
print("最早可用offset {}".format(earliest_offsets))
last_offsets = topic.latest_available_offsets()
print("最近可用offset {}".format(last_offsets))
offset = consumer.held_offsets
print("當前消費者分區offset情況{}".format(offset))
while True:
msg = consumer.consume()
offset = consumer.held_offsets
print("{}, 當前消費者分區offset情況{}".format(msg.value.decode(), offset))

if __name__ == '__main__':
host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
kafka_ins = KafkaTest(host)
topic = 'test'
# kafka_ins.simple_consumer(topic)
kafka_ins.balance_consumer(topic)
使用consumber_group和consumer_id
# -* coding:utf8 *-
from pykafka import KafkaClient

host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
client = KafkaClient(hosts = host)

print(client.topics)

# 消費者
topic = client.topics['test'.encode()]
consumer = topic.get_simple_consumer(consumer_group='test_group',
# 設置為False的時候不需要添加consumer_group,直接連接topic即可取到消息
auto_commit_enable=True,
auto_commit_interval_ms=1,
#這里就是連接多個zk
zookeeper_connect='192.168.17.64:2181,192.168.17.65:2181,192.168.17.68:2181'
consumer_id='test_id')

for message in consumer:
if message is not None:
#打印接收到的消息體的偏移個數和值
print(message.offset, message.value)
報錯:AttributeError: 'SimpleConsumer' object has no attribute '_consumer_group'

       是因為kafka在傳輸的時候需要bytes,而不是str,所以在str上加上b標識就可以,如下:

# -* coding:utf8 *-
from pykafka import KafkaClient

host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
client = KafkaClient(hosts = host)

print(client.topics)

# 消費者
topic = client.topics['test'.encode()]
consumer = topic.get_simple_consumer(consumer_group=b'test_group', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b'test_id')

for message in consumer:
if message is not None:
print(message.offset, message.value.decode('utf-8'))
不要重復消費,對已經消費過的信息進行舍棄
consumer = topic.get_simple_consumer(consumer_group=b'test_group',
auto_commit_enable=True,
auto_commit_interval_ms=1,
consumer_id=b'test_id')
      不希望得到歷史數據的時候,需要使用auto_commit_enable這個參數。 

當consumer_group=b'test_group',運行一次后,能夠得到正常數據;再一次后,就數據讀取不到了,如下:
{b'kafka_test': None, b'test': None}
       如果要在讀取一次數據,就需要修改consumber_group的id,例如修改成consumber_group=b'test_group_1'后,再運行一次,就可以正常讀取數據了。

        因為:是kafka的訂閱原理,同一個group下,消費之后已經讀取完,如果想得到數據必須修改consumber_group_id。

        group是消費者中的概念,按照group(組)對消費者進行區分。對於每個group,需要先指定訂閱哪個topic的消息,然后該topic下的partition會平均分配到group下面的consumer上。所以會出現以下這些情況:
        1、一個topic被多個group訂閱,那么一條消息就會被不同group中的多個consumer處理。
        2、同一個group中,每個partition只會被一個consumer處理,這個consumer處理的消息不一定是同一個key的。所以需要在處理的地方判斷。

三、kafka-python
(1) kafka-python安裝
# PyPI安裝
pip install kafka-python

# conda安裝
conda install -c conda-forge kafka-python

# anaconda自帶pip安裝
/root/anaconda3/bin/pip install kafka-python
(2) kafka-python的api
     https://kafka-python.readthedocs.io/en/master/apidoc/modules.html

     https://kafka-python.readthedocs.io/en/master/index.html

     https://pypi.org/project/kafka-python/

(3) kafka-python生產者
import time
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers = ['192.168.17.64:9092', '192.168.17.65:9092', '192.168.17.68:9092'])
# Assign a topic
topic = 'test'

def test():
print('begin')
n = 1
try:
while (n<=100):
producer.send(topic, str(n).encode())
print("send" + str(n))
n += 1
time.sleep(0.5)
except KafkaError as e:
print(e)
finally:
producer.close()
print('done')

if __name__ == '__main__':
test()
(4) kafka-python消費者
#!/bin/env python
from kafka import KafkaConsumer

#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('test', group_id = 'test_group', bootstrap_servers = ['192.168.17.64:9092', '192.168.17.65:9092', '192.168.17.68:9092'])
try:
for msg in consumer:
print(msg)
print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition,msg.offset, msg.key, msg.value))
except KeyboardInterrupt, e:
print(e)
輸出結果:

ConsumerRecord(topic='test', partition=0, offset=246, timestamp=1531980887190, timestamp_type=0, key=None, value=b'1', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=247, timestamp=1531980887691, timestamp_type=0, key=None, value=b'2', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=248, timestamp=1531980888192, timestamp_type=0, key=None, value=b'3', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=249, timestamp=1531980888694, timestamp_type=0, key=None, value=b'4', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=250, timestamp=1531980889196, timestamp_type=0, key=None, value=b'5', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=251, timestamp=1531980889697, timestamp_type=0, key=None, value=b'6', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=252, timestamp=1531980890199, timestamp_type=0, key=None, value=b'7', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=253, timestamp=1531980890700, timestamp_type=0, key=None, value=b'8', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=254, timestamp=1531980891202, timestamp_type=0, key=None, value=b'9', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=255, timestamp=1531980891703, timestamp_type=0, key=None, value=b'10', checksum=None, serialized_key_size=-1, serialized_value_size=2)
enable_auto_commit=False
consumer = kafka.KafkaConsumer(bootstrap_servers = ['192.168.17.64:9092','192.168.17.65:9092','192.168.17.68:9092'],
group_id ='test_group_id',
auto_offset_reset ='latest',
enable_auto_commit = False)
       自動提交位移設為flase, 默認為取最新的偏移量,重新建立一個group_id,這樣就實現了不影響別的應用程序消費數據,又能消費到最新數據,實現預警(先於用戶發現)的目的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM