Python消費kafka


kfaka還沒弄明白,暫時留個腳印。

參考鏈接:https://www.kingname.info/2020/03/23/operate-kafka-by-python/

import time

from kafka import KafkaConsumer
from kafka import TopicPartition


def debug1():
    consumer = KafkaConsumer(
            'test_topic',
            group_id='group16',
            bootstrap_servers=['10.198.50.218:9092', '10.198.50.33:9092',
                               '10.194.4.5:9092'],
            auto_offset_reset='earliest',  # earliest|latest
            security_protocol='SASL_PLAINTEXT',
            sasl_mechanism='PLAIN',
            sasl_plain_username='admin',
            sasl_plain_password='admin',
            api_version=(0, 10),
            receive_buffer_bytes=1024,
            # enable_auto_commit=True,
            # auto_commit_interval_ms=1000,
            # consumer_timeout_ms=2000,
            )
    print(consumer.partitions_for_topic('test_topic'))  # 獲取主題的分區信息
    print(consumer.topics())  # 獲取主題列表
    print(consumer.subscription())  # 獲取當前消費者訂閱的主題
    print(consumer.assignment())  # 獲取當前消費者topic、分區信息
    print(consumer.beginning_offsets(consumer.assignment()))  # 獲取當前消費者可消費的偏移量
    print(consumer.end_offsets(consumer.assignment()))
    message = []
    i = 0
    for msg in consumer:
        print(msg)
        value = msg.value.decode('utf-8')
        message.append(value)
        # time.sleep(1)
        if i >= 8:
            break
        i += 1


if __name__ == '__main__':
    debug1()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM