python 發送kafka


python 發送kafka大體有三種方式

1 發送並忘記(不關注是否正常到達,不對返回結果做處理)

1 import pickle
2 import time
3 from kafka import KafkaProducer
4
5 producer = KafkaProducer(bootstrap_servers=['ip:9092'],
6                          key_serializer=lambda k: pickle.dumps(k),
7                          value_serializer=lambda v: pickle.dumps(v))
8
9 start_time = time.time()
10 for i in range(0, 10000):
11     print('------{}---------'.format(i))
12     future = producer.send('test_topic', key='num', value=i, partition=0)
13
14 # 將緩沖區的全部消息push到broker當中
15 producer.flush()
16 producer.close()
17
18 end_time = time.time()
19 time_counts = end_time - start_time
20 print(time_counts)

2  同步發送(通過get方法等待Kafka的響應,判斷消息是否發送成功)

1 import pickle
2 import time
3 from kafka import KafkaProducer
4 from kafka.errors import kafka_errors
5
6 producer = KafkaProducer(
7     bootstrap_servers=['ip:9092'],
8     key_serializer=lambda k: pickle.dumps(k),
9     value_serializer=lambda v: pickle.dumps(v)
10 )
11
12 start_time = time.time()
13 for i in range(0, 10000):
14     print('------{}---------'.format(i))
15     future = producer.send(topic="test_topic", key="num", value=i)
16     # 同步阻塞,通過調用get()方法進而保證一定程序是有序的.
17     try:
18         record_metadata = future.get(timeout=10)
19         # print(record_metadata.topic)
20         # print(record_metadata.partition)
21         # print(record_metadata.offset)
22     except kafka_errors as e:
23         print(str(e))
24
25 end_time = time.time()
26 time_counts = end_time - start_time
27 print(time_counts)

3  異步發送+回調函數(消息以異步的方式發送,通過回調函數返回消息發送成功/失敗)

1 import pickle
2 import time
3 from kafka import KafkaProducer
4
5 producer = KafkaProducer(
6     bootstrap_servers=['ip:9092'],
7     key_serializer=lambda k: pickle.dumps(k),
8     value_serializer=lambda v: pickle.dumps(v)
9 )
10
11
12 def on_send_success(*args, **kwargs):
13     """
14     發送成功的回調函數
15     :param args:
16     :param kwargs:
17     :return:
18     """
19     return args
20
21
22 def on_send_error(*args, **kwargs):
23     """
24     發送失敗的回調函數
25     :param args:
26     :param kwargs:
27     :return:
28     """
29
30     return args
31
32
33 start_time = time.time()
34 for i in range(0, 10000):
35     print('------{}---------'.format(i))
36     # 如果成功,傳進record_metadata,如果失敗,傳進Exception.
37     producer.send(
38         topic="test_topic", key="num", value=i
39     ).add_callback(on_send_success).add_errback(on_send_error)
40
41 producer.flush()
42 producer.close()
43
44 end_time = time.time()
45 time_counts = end_time - start_time
46 print(time_counts)

 

 除此之外,還能發送壓縮數據流

def gzip_compress(msg_str):
    try:
        buf = StringIO.StringIO()
        with gzip.GzipFile(mode='wb', fileobj=buf) as f:
            f.write(msg_str)
        return buf.getvalue()
    except BaseException, e:
        print ("Gzip壓縮錯誤" + e)


def gzip_uncompress(c_data):
    try:
        buf = StringIO.StringIO(c_data)
        with gzip.GzipFile(mode='rb', fileobj=buf) as f:
            return f.read()
    except BaseException, e:
        print ("Gzip解壓錯誤" + e)


def send_kafka(topic_name, msg, key=None):
    if key is not None:
        producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"],
                                 key_serializer=gzip_compress, value_serializer=gzip_compress)
        r = producer.send(topic_name, value=msg, key=key)
    else:
        producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"],
                                 value_serializer=gzip_compress)
        r = producer.send(topic_name, value=msg)
    # producer.flush(timeout=5)
    producer.close(timeout=5)
    return r

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM