Python消费kafka


kfaka还没弄明白,暂时留个脚印。

参考链接:https://www.kingname.info/2020/03/23/operate-kafka-by-python/

import time

from kafka import KafkaConsumer
from kafka import TopicPartition


def debug1():
    consumer = KafkaConsumer(
            'test_topic',
            group_id='group16',
            bootstrap_servers=['10.198.50.218:9092', '10.198.50.33:9092',
                               '10.194.4.5:9092'],
            auto_offset_reset='earliest',  # earliest|latest
            security_protocol='SASL_PLAINTEXT',
            sasl_mechanism='PLAIN',
            sasl_plain_username='admin',
            sasl_plain_password='admin',
            api_version=(0, 10),
            receive_buffer_bytes=1024,
            # enable_auto_commit=True,
            # auto_commit_interval_ms=1000,
            # consumer_timeout_ms=2000,
            )
    print(consumer.partitions_for_topic('test_topic'))  # 获取主题的分区信息
    print(consumer.topics())  # 获取主题列表
    print(consumer.subscription())  # 获取当前消费者订阅的主题
    print(consumer.assignment())  # 获取当前消费者topic、分区信息
    print(consumer.beginning_offsets(consumer.assignment()))  # 获取当前消费者可消费的偏移量
    print(consumer.end_offsets(consumer.assignment()))
    message = []
    i = 0
    for msg in consumer:
        print(msg)
        value = msg.value.decode('utf-8')
        message.append(value)
        # time.sleep(1)
        if i >= 8:
            break
        i += 1


if __name__ == '__main__':
    debug1()

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM