Kafka 入門(四)-- Python Kafka Client 性能測試


一、前言

  由於工作原因使用到了 Kafka,而現有的代碼並不能滿足性能需求,所以需要開發高效讀寫 Kafka 的工具,本文是一個 Python Kafka Client 的性能測試記錄,通過本次測試,可以知道選用什么第三方庫的性能最高,選用什么編程模型開發出來的工具效率最高。

 

二、第三方庫性能測試

1.第三方庫

  此次測試的是三個主要的 Python Kafka Client:pykafka、kafka-python 和 confluent-kafka,具體介紹見官網:

2.測試環境

       此次測試使用的 Python 版本是2.7,第三方庫的版本為:

  • pykafka:2.8.0
  • kafka-python:2.0.2
  • confluent-kafka:1.5.0

       使用的數據總量有50萬,每條數據大小為2KB,總共為966MB。

3.測試過程

(1)Kafka Producer 測試

  分別使用 pykafka、kafka-python 和 confluent-kafka 實例化一個 Kafka 的 Producer 對象,然后調用相應的 produce 方法將數據推送給 Kafka,數據總條數為50萬,比較三個庫所耗費的時間,並計算每秒鍾可以推送的數據條數和大小,比較得出性能最優的。

  代碼示例(以 pykafka 為例):

 1 import sys
 2 from datetime import datetime
 3 from pykafka import KafkaClient
 4 
 5 
 6 class KafkaProducerTool():
 7     def __init__(self, broker, topic):
 8         client = KafkaClient(hosts=broker)
 9         self.topic = client.topics[topic]
10         self.producer = self.topic.get_producer()
11 
12     def send_msg(self, msg):
13         self.producer.produce(msg)
14 
15 
16 if __name__ == '__main__':
17     producer = KafkaProducerTool(broker, topic)
18     print(datetime.now())
19     for line in sys.stdin:
20         producer.send_msg(line.strip())
21     producer.producer.stop()
22     print(datetime.now())

(2)Kafka Consumer 測試

  分別使用 pykafka、kafka-python 和 confluent-kafka 實例化一個 Kafka 的 Consumer 對象,然后調用相應的 consume 方法從 Kafka 中消費數據,要消費下來的數據總條數為50萬,比較三個庫所耗費的時間,並計算每秒鍾可以消費的數據條數和大小,比較得出性能最優的。

  代碼示例(以 pykafka 為例):

 1 from datetime import datetime
 2 from pykafka import KafkaClient
 3 
 4 
 5 class KafkaConsumerTool():
 6     def __init__(self, broker, topic):
 7         client = KafkaClient(hosts=broker)
 8         self.topic = client.topics[topic]
 9         self.consumer = self.topic.get_simple_consumer()
10 
11     def receive_msg(self):
12         count = 0
13         print(datetime.now())
14         while True:
15             msg = self.consumer.consume()
16             if msg:
17                 count += 1
18             if count == 500000:
19                 print(datetime.now())
20                 return
21 
22 
23 if __name__ == '__main__':
24     consumer = KafkaConsumerTool(broker, topic)
25     consumer.receive_msg()
26     consumer.consumer.stop()

4.測試結果

  • Kafka Producer 測試結果:
  總耗時/秒 每秒數據量/MB 每秒數據條數
confluent_kafka 35 27.90 14285.71
pykafka 50 19.53 10000
kafka-python 532 1.83 939.85
  • Kafka Consumer 測試結果:
  總耗時/秒 每秒數據量/MB 每秒數據條數
confluent_kafka 39 25.04 12820.51
kafka-python 52 18.78 9615.38
pykafka 335 2.92 1492.54

5.測試結論

  經過測試,在此次測試的三個庫中,生產消息的效率排名是:confluent-kafka > pykafka > kafka-python,消費消息的效率排名是:confluent-kafka > kafka-python > pykafka,由此可見 confluent-kafka 的性能是其中最優的,因而選用這個庫進行后續開發。

 

三、多線程模型性能測試

1.編程模型

  經過前面的測試已經知道 confluent-kafka 這個庫的性能是很優秀的了,但如果還需要更高的效率,應該怎么辦呢?當單線程(或者單進程)不能滿足需求時,我們很容易想到使用多線程(或者多進程)來增加並發提高效率,考慮到線程的資源消耗比進程少,所以打算選用多線程來進行開發。那么多線程消費 Kafka 有什么實現方式呢?我想到的有兩種:

  1. 一個線程實現一個 Kafka Consumer,最多可以有 n 個線程同時消費 Topic(其中 n 是該 Topic 下的分區數量);
  2. 多個線程共用一個 Kafka Consumer,此時也可以實例化多個 Consumer 同時消費。

    

  對比這兩種多線程模型:

  • 模型1實現方便,可以保證每個分區有序消費,但 Partition 數量會限制消費能力;
  • 模型2並發度高,可擴展能力強,消費能力不受 Partition 限制。

 2.測試過程

(1)多線程模型1

  測試代碼:

 1 import time
 2 from threading import Thread
 3 from datetime import datetime
 4 from confluent_kafka import Consumer
 5 
 6 
 7 class ChildThread(Thread):
 8     def __init__(self, name, broker, topic):
 9         Thread.__init__(self, name=name)
10         self.con = KafkaConsumerTool(broker, topic)
11 
12     def run(self):
13         self.con.receive_msg()
14 
15 
16 class KafkaConsumerTool:
17     def __init__(self, broker, topic):
18         config = {
19             'bootstrap.servers': broker,
20             'session.timeout.ms': 30000,
21             'auto.offset.reset': 'earliest',
22             'api.version.request': False,
23             'broker.version.fallback': '2.6.0',
24             'group.id': 'test'
25         }
26         self.consumer = Consumer(config)
27         self.topic = topic
28 
29     def receive_msg(self):
30         self.consumer.subscribe([self.topic])
31         print(datetime.now())
32         while True:
33             msg = self.consumer.poll(timeout=30.0)
34             print(msg)
35 
36 
37 if __name__ == '__main__':
38     thread_num = 10
39     threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in range(thread_num)]
40 
41     for i in range(thread_num):
42         threads[i].setDaemon(True)
43     for i in range(thread_num):
44         threads[i].start()

  因為我使用的 Topic 共有8個分區,所以我分別測試了線程數在5個、8個和10個時消費50萬數據所需要的時間,並計算每秒可消費的數據條數。

(2)多線程模型2

  測試代碼:

 1 import time
 2 from datetime import datetime
 3 from confluent_kafka import Consumer
 4 from threadpool import ThreadPool, makeRequests
 5 
 6 
 7 class KafkaConsumerTool:
 8     def __init__(self, broker, topic):
 9         config = {
10             'bootstrap.servers': broker,
11             'session.timeout.ms': 30000,
12             'auto.offset.reset': 'earliest',
13             'api.version.request': False,
14             'broker.version.fallback': '2.6.0',
15             'group.id': 'mini-spider'
16         }
17         self.consumer = Consumer(config)
18         self.topic = topic
19 
20     def receive_msg(self, x):
21         self.consumer.subscribe([self.topic])
22         print(datetime.now())
23         while True:
24             msg = self.consumer.poll(timeout=30.0)
25             print(msg)
26 
27 
28 if __name__ == '__main__':
29     thread_num = 10
30     consumer = KafkaConsumerTool(broker, topic)
31     pool = ThreadPool(thread_num)
32     for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
33         pool.putRequest(r)
34     pool.wait()

  主要使用 threadpool 這個第三方庫來實現線程池,此處當然也可以使用其他庫來實現,這里我分別測試了線程數量在5個和10個時消費50萬數據所需要的時間,並計算每秒可消費的數據條數。

3.測試結果

  • 多線程模型1
 總數據量/萬 線程數量 總耗時/秒 每秒數據條數
50 5 27 18518.51
50 8 24 20833.33
50 10 26 19230.76
  • 多線程模型2
  總數據量/萬 線程數量 總耗時/秒 每秒數據條數
50 5 17 29411.76
50 10 13 38461.53

4.測試結論

  使用多線程可以有效提高 Kafka 的 Consumer 消費數據的效率,而選用線程池共用一個 KafkaConsumer 的消費方式的消費效率更高。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM