python 發送kafka大體有三種方式
1 發送並忘記(不關注是否正常到達,不對返回結果做處理)
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer(bootstrap_servers=['ip:9092'], 6 key_serializer=lambda k: pickle.dumps(k), 7 value_serializer=lambda v: pickle.dumps(v)) 8 9 start_time = time.time() 10 for i in range(0, 10000): 11 print('------{}---------'.format(i)) 12 future = producer.send('test_topic', key='num', value=i, partition=0) 13 14 # 將緩沖區的全部消息push到broker當中 15 producer.flush() 16 producer.close() 17 18 end_time = time.time() 19 time_counts = end_time - start_time 20 print(time_counts)
2 同步發送(通過get方法等待Kafka的響應,判斷消息是否發送成功)
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 from kafka.errors import kafka_errors 5 6 producer = KafkaProducer( 7 bootstrap_servers=['ip:9092'], 8 key_serializer=lambda k: pickle.dumps(k), 9 value_serializer=lambda v: pickle.dumps(v) 10 ) 11 12 start_time = time.time() 13 for i in range(0, 10000): 14 print('------{}---------'.format(i)) 15 future = producer.send(topic="test_topic", key="num", value=i) 16 # 同步阻塞,通過調用get()方法進而保證一定程序是有序的. 17 try: 18 record_metadata = future.get(timeout=10) 19 # print(record_metadata.topic) 20 # print(record_metadata.partition) 21 # print(record_metadata.offset) 22 except kafka_errors as e: 23 print(str(e)) 24 25 end_time = time.time() 26 time_counts = end_time - start_time 27 print(time_counts)
3 異步發送+回調函數(消息以異步的方式發送,通過回調函數返回消息發送成功/失敗)
1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer( 6 bootstrap_servers=['ip:9092'], 7 key_serializer=lambda k: pickle.dumps(k), 8 value_serializer=lambda v: pickle.dumps(v) 9 ) 10 11 12 def on_send_success(*args, **kwargs): 13 """ 14 發送成功的回調函數 15 :param args: 16 :param kwargs: 17 :return: 18 """ 19 return args 20 21 22 def on_send_error(*args, **kwargs): 23 """ 24 發送失敗的回調函數 25 :param args: 26 :param kwargs: 27 :return: 28 """ 29 30 return args 31 32 33 start_time = time.time() 34 for i in range(0, 10000): 35 print('------{}---------'.format(i)) 36 # 如果成功,傳進record_metadata,如果失敗,傳進Exception. 37 producer.send( 38 topic="test_topic", key="num", value=i 39 ).add_callback(on_send_success).add_errback(on_send_error) 40 41 producer.flush() 42 producer.close() 43 44 end_time = time.time() 45 time_counts = end_time - start_time 46 print(time_counts)
除此之外,還能發送壓縮數據流
def gzip_compress(msg_str): try: buf = StringIO.StringIO() with gzip.GzipFile(mode='wb', fileobj=buf) as f: f.write(msg_str) return buf.getvalue() except BaseException, e: print ("Gzip壓縮錯誤" + e) def gzip_uncompress(c_data): try: buf = StringIO.StringIO(c_data) with gzip.GzipFile(mode='rb', fileobj=buf) as f: return f.read() except BaseException, e: print ("Gzip解壓錯誤" + e) def send_kafka(topic_name, msg, key=None): if key is not None: producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"], key_serializer=gzip_compress, value_serializer=gzip_compress) r = producer.send(topic_name, value=msg, key=key) else: producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"], value_serializer=gzip_compress) r = producer.send(topic_name, value=msg) # producer.flush(timeout=5) producer.close(timeout=5) return r