- 基础通讯
- Producer.py
import confluent_kafka import time topic = 'confluent-kafka-topic' def confluent_kafka_producer_performance(): topic = 'confluent-kafka-topic' conf = {'bootstrap.servers': '192.168.65.130:9092'} producer = confluent_kafka.Producer(**conf) messages_to_retry = 0 msg_payload = 'This is message' producer_start = time.time() for i in range(10): try: producer.produce(topic, value=msg_payload) print(msg_payload) except BufferError as e: messages_to_retry += 1 # hacky retry messages that over filled the local buffer for i in range(messages_to_retry): producer.poll(0) try: producer.produce(topic, value=msg_payload) except BufferError as e: producer.poll(0) producer.produce(topic, value=msg_payload) producer.flush() return time.time() - producer_start if __name__ == "__main__": time_span = confluent_kafka_producer_performance() print(time_span)
-
- Consumer.py
import confluent_kafka import uuid import time def confluent_kafka_consumer_performance(): topic = 'confluent-kafka-topic' msg_consumed_count = 0 conf = {'bootstrap.servers': '192.168.65.130:9092', 'group.id': uuid.uuid1(), 'session.timeout.ms': 6000, 'default.topic.config': { 'auto.offset.reset': 'earliest' } } consumer = confluent_kafka.Consumer(**conf) consumer_start = time.time() # This is the same as pykafka, subscribing to a topic will start a background thread consumer.subscribe([topic]) while True: msg = consumer.poll(1) if msg: msg_consumed_count += 1 print(msg) if msg_consumed_count >= 10: break consumer_timing = time.time() - consumer_start consumer.close() return consumer_timing if __name__ == "__main__": time_span = confluent_kafka_consumer_performance() print(time_span)
- 分区实现
- pro_partition.py
import confluent_kafka conf = {'bootstrap.servers':'192.168.65.130:9092'} producer = confluent_kafka.Producer(**conf) producer.produce('con_1',key ='key',value='part_0', partition=0) producer.poll(1)
-
- con_partition.py
import confluent_kafka import uuid conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()} consumer = confluent_kafka.Consumer(**conf) tp1=confluent_kafka.TopicPartition('con_1', 0) consumer.assign([tp1]) msg = consumer.poll(1) msg.key() msg.value()