遠程連接 kafka 配置
默認的 kafka 配置是無法遠程訪問的,解決該問題有幾個方案。
方案1
advertised.listeners=PLAINTEXT://IP:9092
注意必須是 ip,不能是 hostname
方案2
advertised.listeners=PLAINTEXT://node0:9092
node0 是 hostname,需在 /etc/hosts 中 添加一行
172.16.89.80 node0
然后 必須在 遠程機(要訪問 kafka 的機器 windows)上修改 hosts文件,
C:\Windows\System32\drivers\etc\hosts
在末尾加上
IP1 節點1
IP2 節點2
節點名與服務器上的 hostname 相同。
測試異常記錄
WARN [Consumer clientId=consumer-1, groupId=console-consumer-4184] Connection to node -1 (/172.16.89.80:9092) could not be established. Broker may not be available . (org.apache.kafka.clients.NetworkClient)
kafka 配置 與 console 啟動的 ip 不一致,如 配置文件中 listeners=PLAINTEXT://172.16.89.80:9092,啟動 是 localhost
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
listeners=PLAINTEXT://172.16.89.80:9092
基礎操作
最簡單的場景,生產者發送,消費者接收
Producer
send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
producer = KafkaProducer(bootstrap_servers = '172.16.89.80:9092') # producer = KafkaProducer(bootstrap_servers = 'node0:9092') # 如果這里是 ip,后面必須加 producer.flush() 或者 producer.close(); # 如果是 hostname,則不需要,但是好像有丟包 print(producer.config) # 打印配置信息 topic = '91202' for i in range(200): msg = "msg%d" % i producer.send(topic, msg) producer.flush() # producer.close()
上面的注釋是我親測的結果,至於為什么,我還沒明白,誰能幫我解答
Consumer
topic = '91202' consumer = KafkaConsumer(topic, bootstrap_servers=['172.16.89.80:9092']) # consumer是一個消息隊列,當后台有消息時,這個消息隊列就會自動增加.所以遍歷也總是會有數據,當消息隊列中沒有數據時,就會堵塞等待消息帶來 for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv time.sleep(2)
注意 這種消費方式只能消費實時數據,不能消費歷史數據
如果想讀歷史消息,可以這樣寫
consumer = KafkaConsumer(topic, auto_offset_reset='earliest', bootstrap_servers=['172.16.89.80:9092'])
auto_offset_reset:重置 offset,earliest 表示移到最早的可用消息,lastest 為最新消息,默認 latest
源碼定義:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
輸出
生產者不指定分區,消費者輸出如下
91101:1:8: key=None value=msg0 91101:2:12: key=None value=msg1 91101:0:17: key=None value=msg2
3 條消息分到了3個分區
如果不指定分區,一個 topic 的多個消息會被分發到不同的分區;消費者也收到了所有分區的消息
生產者指定分區,消費者輸出如下
91101:2:13: key=None value=msg0 91101:2:14: key=None value=msg1 91101:2:15: key=None value=msg2
3 條消息被分到同一個分區
阻塞發送
kafka send 消息是異步的,即使 send 發生錯誤,程序也不會提示,我們可以通過 阻塞 的方式確認是否發送。
method1-get
import pickle import time from kafka import KafkaProducer from kafka.errors import kafka_errors producer = KafkaProducer( bootstrap_servers=['172.16.89.80:9092'], key_serializer=lambda k: pickle.dumps(k), value_serializer=lambda v: pickle.dumps(v)) start_time = time.time() for i in range(0, 100): future = producer.send(topic="9908", key="num", value=i) # 同步阻塞,通過調用get()方法保證有序性. try: record_metadata = future.get(timeout=20) # print(record_metadata.topic) # print(record_metadata.partition) # print(record_metadata.offset) except kafka_errors as e: print(str(e)) end_time = time.time() time_counts = end_time - start_time print(time_counts)
method2-flush
import pickle import time from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['172.16.89.80:9092'], key_serializer=lambda k: pickle.dumps(k), value_serializer=lambda v: pickle.dumps(v)) start_time = time.time() for i in range(0, 100): future = producer.send('9909', key='num', value=i, partition=0) # 將緩沖區的全部消息push到broker當中 producer.flush() producer.close() end_time = time.time() time_counts = end_time - start_time print(time_counts)
格式化輸入輸出
KafkaProducer 指定生產者輸入的格式轉換方式,key_serializer、value_serializer 用於格式化 key 和 value
KafkaConsumer 指定消費者輸出的格式轉換方式,value_deserializer
指定消費者組
之前講過,不同消費者組消費相同的 topic,互不影響;同一個消費者組的不同成員在同一時刻不能消費同一個 topic 相同的分區;一個線程相當於一個消費者 【驗證】
這里來驗證一下
Producer
不指定分區,20條消息,3個分區
producer = KafkaProducer(bootstrap_servers = '172.16.89.80:9092') print(producer.config)##打印配置信息 topic = '91101' for i in range(20): msg = "msg%d" % i producer.send(topic, msg) time.sleep(2) producer.close()
Consumer
開兩個線程,或者兩個窗口,指定相同的消費者組,數組名 隨便寫,一樣即可;
也是實時消費;
topic = '91101' consumer = KafkaConsumer(topic, group_id='111', bootstrap_servers=['172.16.89.80:9092']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
輸出
一個消費者取到 0和1 分區的 value
91101:1:28: key=None value=msg1 91101:0:51: key=None value=msg3 91101:0:52: key=None value=msg5 91101:0:53: key=None value=msg9 91101:0:54: key=None value=msg10 91101:1:29: key=None value=msg11 91101:0:55: key=None value=msg12 91101:1:30: key=None value=msg15 91101:0:56: key=None value=msg16 91101:1:31: key=None value=msg17 91101:0:57: key=None value=msg18
一個消費者取到 2 分區的 value
91101:2:46: key=None value=msg0 91101:2:47: key=None value=msg2 91101:2:48: key=None value=msg4 91101:2:49: key=None value=msg6 91101:2:50: key=None value=msg7 91101:2:51: key=None value=msg8 91101:2:52: key=None value=msg13 91101:2:53: key=None value=msg14 91101:2:54: key=None value=msg19
也就是沒有消費不同的分區,結論正確。
由於只有2個消費者,3個分區,所以必須有一個消費者消費2個分區;
如果是3個消費者應該就不存在這種情況,經驗證,確實如此;
如果有再多的消費者,就分不到消息了;
這也驗證了 之前講的,同一個topic,不推薦多於 partition 個數的 消費者來消費,會造成資源浪費。 【驗證】
小結
1. 消費者不指定組,能收到所有分區的 消息
2. 如果指定了組,同組的不同消費者會消費不同的分區
3. 如果2個分區2個消費者,則一人一個分區 ;如果2個分區3個消費者,則有一個人收不到消息;
4. 如果想消費同一分區,指定不同的組
這種特性可以用作 負載均衡
設定 offset
kafka 提供了 偏移量 offset 的概念,根據偏移量可以讀取 終端開啟前 未接收的數據, 也可以讀取任意位置的數據 【可讀取歷史數據】
為了驗證效果,操作如下
1. 創建一個新的 topic
2. 開啟生產者,不開啟消費者,並發送一些數據
3. 開啟消費者,並指定 分區 和 偏移量,設置偏移量為 0
4. 再次運行消費者,指定相同的分區,偏移量設置為 4
topic = '91107' consumer = KafkaConsumer(group_id='111', bootstrap_servers=['172.16.89.80:9092']) # consumer 指定主題和分區 consumer.assign([TopicPartition(topic, partition=0), TopicPartition(topic, partition=1)]) # # 獲取主題的分區信息, print consumer.partitions_for_topic(topic) # None # 獲取 consumer 的指定 print consumer.assignment() # set([TopicPartition(topic='91107', partition=0), TopicPartition(topic='91107', partition=1)]) # 獲取 consumer 指定分區 的 起始offset print consumer.beginning_offsets(consumer.assignment()) # {TopicPartition(topic=u'91107', partition=0): 0, TopicPartition(topic=u'91107', partition=1): 0} consumer.seek(TopicPartition(topic, partition=1), 4) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print recv
輸出
# offset 0 91107:1:0: key=None value=msg2 91107:1:1: key=None value=msg6 91107:1:2: key=None value=msg10 91107:1:3: key=None value=msg11 91107:1:4: key=None value=msg13 91107:1:5: key=None value=msg19 # offset 4 91107:1:4: key=None value=msg13 91107:1:5: key=None value=msg19
相當於一個文件中從頭到尾 6 行,offset 4 從第 4 行開始讀,【行數從 0 開始】
異常記錄
kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.
KafkaConsumer 和 assign 不能同時 指定 topic
小結
1. offset 相當於文件中的 offset 概念
2. 指定 offset 時,必須指定分區,一個分區相當於一個文件,指定分區就相當於指定文件,offset 表示 從文件中 offset 行開始讀
3. offset 功能可以讀取 歷史數據
定時主動拉取
主動拉取可能拉不到
from kafka import KafkaConsumer import time # consumer = KafkaConsumer(bootstrap_servers=['node0:9092']) # 這樣寫只能獲取最新消息 consumer = KafkaConsumer(bootstrap_servers=['node0:9092'], auto_offset_reset='earliest') # 這樣可以從頭拉取 consumer.subscribe(topics=('91201','91202')) while True: msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息 print(msg.keys()) print('*' * 100) time.sleep(2)
主動從多個 topic 拉取數據
輸出
[] **************************************************************************************************** [TopicPartition(topic=u'91201', partition=1), TopicPartition(topic=u'91201', partition=0)] **************************************************************************************************** [TopicPartition(topic=u'91201', partition=0), TopicPartition(topic=u'91201', partition=2)] **************************************************************************************************** [TopicPartition(topic=u'91202', partition=1), TopicPartition(topic=u'91201', partition=2)]
可以看到有的回合沒有拉倒數據
消息掛起與恢復
掛起,消費者不能消費,恢復后,才能消費
from kafka import KafkaConsumer from kafka.structs import TopicPartition import time topic = '91202' consumer = KafkaConsumer(bootstrap_servers=['node0:9092']) consumer.subscribe(topics=(topic)) consumer.topics() consumer.pause(TopicPartition(topic=topic, partition=0)) # pause執行后,consumer不能讀取,直到調用resume后恢復。 num = 0 while True: print(num) print(consumer.paused()) # 獲取當前掛起的消費者 msg = consumer.poll(timeout_ms=5) print(msg) time.sleep(2) num = num + 1 if num == 10: print("resume...") consumer.resume(TopicPartition(topic=topic, partition=0)) print("resume......")
參考資料:
https://www.cnblogs.com/tigerzhouv587/p/11232398.html python 發送kafka
https://kafka-python.readthedocs.io/en/master/usage.html
https://blog.csdn.net/luanpeng825485697/article/details/81036028 好全的資料
https://www.jianshu.com/p/776c188cefa9 從 zookeeper 消費
https://cloud.tencent.com/developer/news/202116 也挺全的資料