kafak-python函數使用詳解


 

  • Consumer是非線程安全的
  • Kafka只保證消息不漏,即at lease once,而不保證消息不重。關鍵點:假如consumer掛了重啟,那它將從committed offset位置(告訴server的消費的位置點)開始重新消費,而不是consume offset位置(真正的消費位置點)。這也就意味着有可能重復消費(自己消費到了某個位置,而后在告訴服務器這個位置時,發送失敗)
  • kafka可以重置commit嗎?給服務器指定任意值為最后消費位置,下次消費從這個指定的位置開始消費。可以,使用commit函數,下文有講。但是需要注意:修改偏移量不會改變當前會話,在新連接里生效
  • subscribe表示訂閱topic,從kafka記錄的offset開始消費。assign表示從指定的offset開始消費。subscribe只指定topic和group,具體消費那個分區,由group coordinator決定,會受rebalance影響;assign必須指定要消費的分區信息,格式是TopicPartitions,不會受rebalance影響。 這兩種方法只能使用一個。
  • kafka自動會從上次沒有消費的地方開始消費
  • 使用kafak自帶的腳本查看偏移量:./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test --offsets
  • 使用了subscribe,就不能使用assign
  • 提交:更新分區的當前位置稱為提交,當前版本(0.10.1.1)用topic ___consumer_offsets 保存提交的偏移量

     

  • 偏移量:消費者在Kafka追蹤到消息在分區里的位置
  • 消費者在崩潰或者有新的消費者加入群組,就會觸發再均衡。這時需要讀取最后一次偏移量,然后從偏移量指定的地方繼續處理。提交的偏移量小於真實的偏移量,消息會被重復處理。大於真實的偏移量,消息會丟失。

 

from kafka.structs import TopicPartition,OffsetAndMetadata
configs = {
            'bootstrap_servers': '10.57.19.60',
            'enable_auto_commit': False,
            'group_id': 'test',
            'api_version': (0, 8, 2),
            'ssl_check_hostname': False,
            'consumer_timeout_ms': 3000,  # 若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待
            # 'ssl_certfile': ssl_certfile,
            # 'security_protocol': 'SSL',
            # 'ssl_cafile': ssl_cafile
        }
topics=('test', )
# 注意指定分區將會失去故障轉移/負載均衡的支持,當然也沒有了自動分配分區的功能(因為已經人為指定了嘛)
topic_partition = TopicPartition(topic='test',partition=0) 
# 
consumer = KafkaConsumer(**configs)
# 參數必須是列表,表示訂閱的topic/partition列表
consumer.assign([topic_partition])
# 獲取分給當前用戶的topic/partition信息
consumer.assignment()
# 提交偏移量:可以告知服務器當前偏移量,也可以設置偏移量
consumer.commit({TopicPartition(topic='test', partition=0): OffsetAndMetadata(offset=280, metadata='')})
# 異步提交
consumer.commit_async()
# 獲取服務器的最后確認的偏移量,即最新數據開始讀取的地方
consumer.committed(TopicPartition(topic='test', partition=0))
# 獲取服務器當前最新的偏移量,讀到這個偏移量后,所有數據都讀取完了
consumer.highwater(TopicPartition(topic='test', partition=0))
# 獲取消費的性能
consumer.metrics()
# 獲取某個topic的partition信息
consumer.partitions_for_topic(topic)
# 獲取下一條數據開始讀取的偏移量,即從這個便宜量開始繼續讀取數據
consumer.position(TopicPartition(topic='test', partition=0))
# 從指定偏移量位置開始讀取數據 
consumer.seek(TopicPartition(topic='test', partition=0), 283)
# 從頭開始讀取數據
consumer.seek_to_beginning()
# 從最后開始讀取數據
consumer.seek_to_end()
# 訂閱topic,可以訂閱多個,可以使用正則表達式匹配多個
consumer.subscribe()
# 獲取訂閱的信息,無法獲取使用assign分配的topic/partition信息
consumer.subscription()
# 獲取當前用戶授權的topic信息
consumer.topics()
# 取消消息的訂閱
consumer.unsubscribe()
# 一起消費多條消息,最多等待時間timeout_ms,最多消費max_records
consumer.poll(self, timeout_ms=0, max_records=None) # 獲取指定分區第一個偏移量 consumer.beginning_offsets([topic_partition]) # 獲取指定分區最后一個偏移量,最新的偏移量 consumer.end_offsets([topic_partition]) # 關閉連接 consumer.close() # #consumer.seek(topic_partition,
284) for message in consumer: print(message)

 

重復消費是如何產生的?

消費者設置為自動提交偏移量時,需要同時設置自動提交偏移量的時間間隔。如果消費完若干消息后,還沒有到自動提交偏移量的時間時,應用掛了,則系統記錄的偏移量還是之前的值,那么剛才消費的若干消息,會在應用重連之后重新消費

如何保證不會重復消費?

消費段記錄下發送給服務器的偏移量,獲取最新數據時再判斷這個偏移量是否正確

生產的消息隊列長度,會堆積嗎?

消費的信息隊列長度,會堆積嗎?

生產者速度大於消費者速度怎么處理?

kafka 認證與授權機制

Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認證機制。目前支持以下安全措施:

  • clients 與 brokers 認證
  • brokers 與 zookeeper認證
  • 數據傳輸加密  between  brokers and clients, between brokers, or between brokers and tools using SSL
  • 授權clients read/write

 

kafka偏移量的相關配置

enable.auto.commit

true(默認):自動提交偏移量,可以通過配置 auto.commit.interval.ms屬性來控制提交偏移量的頻率。(基於時間間隔)

false:手動控制偏移量。可以在程序邏輯必要的時候提交偏移量,而不是基於時間隔。此時可以進行同步,異步,同步異步組合(參考相應api)。

auto.offset.reset

無法讀取偏移量時候讀取消息的設置

latest(默認):從最新記錄讀取數據。

earliest:從起始位置讀取數據

參考:

1、https://zhuanlan.zhihu.com/p/33238750

2、https://help.aliyun.com/document_detail/68331.html

3、https://blog.csdn.net/xiaoguozi0218/article/details/80513849

4、https://zhuanlan.zhihu.com/p/38330574

5、https://blog.csdn.net/ZhongGuoZhiChuang/article/details/79550570

6、https://help.aliyun.com/document_detail/67233.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM