kafka 教程(三)-遠程訪問


遠程連接 kafka 配置

默認的 kafka 配置是無法遠程訪問的,解決該問題有幾個方案。

方案1

advertised.listeners=PLAINTEXT://IP:9092

注意必須是 ip,不能是 hostname

 

方案2

advertised.listeners=PLAINTEXT://node0:9092

node0 是 hostname,需在 /etc/hosts 中 添加一行

172.16.89.80 node0

 

然后 必須在 遠程機(要訪問 kafka 的機器 windows)上修改 hosts文件,

C:\Windows\System32\drivers\etc\hosts

在末尾加上 

IP1  節點1
IP2  節點2

節點名與服務器上的 hostname 相同。

 

測試異常記錄

WARN [Consumer clientId=consumer-1, groupId=console-consumer-4184] Connection to node -1 (/172.16.89.80:9092) could not be established. Broker may not be available
. (org.apache.kafka.clients.NetworkClient)

kafka 配置 與 console 啟動的 ip 不一致,如 配置文件中 listeners=PLAINTEXT://172.16.89.80:9092,啟動 是  localhost

 

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

listeners=PLAINTEXT://172.16.89.80:9092

 

基礎操作

最簡單的場景,生產者發送,消費者接收

Producer

send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

producer = KafkaProducer(bootstrap_servers = '172.16.89.80:9092')
# producer = KafkaProducer(bootstrap_servers = 'node0:9092')
# 如果這里是 ip,后面必須加 producer.flush() 或者 producer.close();
# 如果是 hostname,則不需要,但是好像有丟包
print(producer.config)  # 打印配置信息

topic = '91202'
for i in range(200):
    msg = "msg%d" % i
    producer.send(topic, msg)

producer.flush()
# producer.close()

上面的注釋是我親測的結果,至於為什么,我還沒明白,誰能幫我解答

 

Consumer

topic = '91202'
consumer = KafkaConsumer(topic, bootstrap_servers=['172.16.89.80:9092'])
# consumer是一個消息隊列,當后台有消息時,這個消息隊列就會自動增加.所以遍歷也總是會有數據,當消息隊列中沒有數據時,就會堵塞等待消息帶來
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv
    time.sleep(2)

注意 這種消費方式只能消費實時數據,不能消費歷史數據

 

如果想讀歷史消息,可以這樣寫

consumer = KafkaConsumer(topic, auto_offset_reset='earliest', bootstrap_servers=['172.16.89.80:9092'])

 auto_offset_reset:重置 offset,earliest 表示移到最早的可用消息,lastest 為最新消息,默認 latest

源碼定義:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

 

輸出

生產者不指定分區,消費者輸出如下

91101:1:8: key=None value=msg0
91101:2:12: key=None value=msg1
91101:0:17: key=None value=msg2

3 條消息分到了3個分區

如果不指定分區,一個 topic 的多個消息會被分發到不同的分區;消費者也收到了所有分區的消息

 

生產者指定分區,消費者輸出如下

91101:2:13: key=None value=msg0
91101:2:14: key=None value=msg1
91101:2:15: key=None value=msg2

3 條消息被分到同一個分區

 

阻塞發送

kafka send 消息是異步的,即使 send 發生錯誤,程序也不會提示,我們可以通過 阻塞 的方式確認是否發送。

 

method1-get

import pickle
import time
from kafka import KafkaProducer
from kafka.errors import kafka_errors
producer = KafkaProducer(
    bootstrap_servers=['172.16.89.80:9092'],
    key_serializer=lambda k: pickle.dumps(k),
    value_serializer=lambda v: pickle.dumps(v))
start_time = time.time()
for i in range(0, 100):
    future = producer.send(topic="9908", key="num", value=i)
    # 同步阻塞,通過調用get()方法保證有序性.
    try:
        record_metadata = future.get(timeout=20)
        # print(record_metadata.topic)
        # print(record_metadata.partition)
        # print(record_metadata.offset)
    except kafka_errors as e:
        print(str(e))
end_time = time.time()
time_counts = end_time - start_time
print(time_counts)

 

method2-flush

import pickle
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.16.89.80:9092'],
                         key_serializer=lambda k: pickle.dumps(k),
                         value_serializer=lambda v: pickle.dumps(v))
start_time = time.time()
for i in range(0, 100):
    future = producer.send('9909', key='num', value=i, partition=0)
# 將緩沖區的全部消息push到broker當中
producer.flush()
producer.close()
end_time = time.time()
time_counts = end_time - start_time
print(time_counts)


格式化輸入輸出

KafkaProducer 指定生產者輸入的格式轉換方式,key_serializer、value_serializer 用於格式化 key 和 value

KafkaConsumer 指定消費者輸出的格式轉換方式,value_deserializer

 

指定消費者組

之前講過,不同消費者組消費相同的 topic,互不影響;同一個消費者組的不同成員在同一時刻不能消費同一個 topic 相同的分區;一個線程相當於一個消費者      【驗證】

這里來驗證一下

 

Producer

不指定分區,20條消息,3個分區

producer = KafkaProducer(bootstrap_servers = '172.16.89.80:9092')
print(producer.config)##打印配置信息

topic = '91101'
for i in range(20):
    msg = "msg%d" % i
    producer.send(topic, msg)
    time.sleep(2)
producer.close()

 

Consumer

開兩個線程,或者兩個窗口,指定相同的消費者組,數組名 隨便寫,一樣即可;

也是實時消費;

topic = '91101'
consumer = KafkaConsumer(topic, group_id='111', bootstrap_servers=['172.16.89.80:9092'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv

 

輸出

一個消費者取到 0和1 分區的 value

91101:1:28: key=None value=msg1
91101:0:51: key=None value=msg3
91101:0:52: key=None value=msg5
91101:0:53: key=None value=msg9
91101:0:54: key=None value=msg10
91101:1:29: key=None value=msg11
91101:0:55: key=None value=msg12
91101:1:30: key=None value=msg15
91101:0:56: key=None value=msg16
91101:1:31: key=None value=msg17
91101:0:57: key=None value=msg18

一個消費者取到 2 分區的 value

91101:2:46: key=None value=msg0
91101:2:47: key=None value=msg2
91101:2:48: key=None value=msg4
91101:2:49: key=None value=msg6
91101:2:50: key=None value=msg7
91101:2:51: key=None value=msg8
91101:2:52: key=None value=msg13
91101:2:53: key=None value=msg14
91101:2:54: key=None value=msg19

也就是沒有消費不同的分區,結論正確。

 

由於只有2個消費者,3個分區,所以必須有一個消費者消費2個分區;

如果是3個消費者應該就不存在這種情況,經驗證,確實如此;

如果有再多的消費者,就分不到消息了;

這也驗證了 之前講的,同一個topic,不推薦多於 partition 個數的 消費者來消費,會造成資源浪費。        【驗證】

 

小結

1. 消費者不指定組,能收到所有分區的 消息

2. 如果指定了組,同組的不同消費者會消費不同的分區

3. 如果2個分區2個消費者,則一人一個分區 ;如果2個分區3個消費者,則有一個人收不到消息;

4. 如果想消費同一分區,指定不同的組

這種特性可以用作 負載均衡

 

設定 offset

kafka 提供了 偏移量 offset 的概念,根據偏移量可以讀取 終端開啟前 未接收的數據, 也可以讀取任意位置的數據  【可讀取歷史數據】

為了驗證效果,操作如下

1. 創建一個新的 topic

2. 開啟生產者,不開啟消費者,並發送一些數據

3. 開啟消費者,並指定 分區 和 偏移量,設置偏移量為 0

4. 再次運行消費者,指定相同的分區,偏移量設置為 4

topic = '91107'
consumer = KafkaConsumer(group_id='111', bootstrap_servers=['172.16.89.80:9092'])

# consumer 指定主題和分區
consumer.assign([TopicPartition(topic, partition=0), TopicPartition(topic, partition=1)])       #

# 獲取主題的分區信息,
print consumer.partitions_for_topic(topic)  # None

# 獲取 consumer 的指定
print consumer.assignment() # set([TopicPartition(topic='91107', partition=0), TopicPartition(topic='91107', partition=1)])

# 獲取 consumer 指定分區 的 起始offset
print consumer.beginning_offsets(consumer.assignment()) 
# {TopicPartition(topic=u'91107', partition=0): 0, TopicPartition(topic=u'91107', partition=1): 0}

consumer.seek(TopicPartition(topic, partition=1), 4)
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv

 

輸出

# offset 0
91107:1:0: key=None value=msg2
91107:1:1: key=None value=msg6
91107:1:2: key=None value=msg10
91107:1:3: key=None value=msg11
91107:1:4: key=None value=msg13
91107:1:5: key=None value=msg19

# offset 4
91107:1:4: key=None value=msg13
91107:1:5: key=None value=msg19

相當於一個文件中從頭到尾 6 行,offset 4 從第 4 行開始讀,【行數從 0 開始】

 

異常記錄

kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.

KafkaConsumer 和 assign 不能同時 指定 topic

 

小結

1. offset 相當於文件中的 offset 概念

2. 指定 offset 時,必須指定分區,一個分區相當於一個文件,指定分區就相當於指定文件,offset 表示 從文件中 offset 行開始讀

3. offset 功能可以讀取 歷史數據

 

定時主動拉取

主動拉取可能拉不到

from kafka import KafkaConsumer
import time

# consumer = KafkaConsumer(bootstrap_servers=['node0:9092'])      # 這樣寫只能獲取最新消息
consumer = KafkaConsumer(bootstrap_servers=['node0:9092'], auto_offset_reset='earliest')        # 這樣可以從頭拉取
consumer.subscribe(topics=('91201','91202'))

while True:
    msg = consumer.poll(timeout_ms=5)   # 從kafka獲取消息
    print(msg.keys())
    print('*' * 100)
    time.sleep(2)

主動從多個 topic 拉取數據

 

輸出

[]
****************************************************************************************************
[TopicPartition(topic=u'91201', partition=1), TopicPartition(topic=u'91201', partition=0)]
****************************************************************************************************
[TopicPartition(topic=u'91201', partition=0), TopicPartition(topic=u'91201', partition=2)]
****************************************************************************************************
[TopicPartition(topic=u'91202', partition=1), TopicPartition(topic=u'91201', partition=2)]

可以看到有的回合沒有拉倒數據

 

消息掛起與恢復

掛起,消費者不能消費,恢復后,才能消費

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

topic = '91202'
consumer = KafkaConsumer(bootstrap_servers=['node0:9092'])
consumer.subscribe(topics=(topic))
consumer.topics()

consumer.pause(TopicPartition(topic=topic, partition=0))  # pause執行后,consumer不能讀取,直到調用resume后恢復。
num = 0
while True:
    print(num)
    print(consumer.paused())   # 獲取當前掛起的消費者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
        print("resume...")
        consumer.resume(TopicPartition(topic=topic, partition=0))
        print("resume......")

 

 

 

參考資料:

https://www.cnblogs.com/tigerzhouv587/p/11232398.html  python 發送kafka

https://kafka-python.readthedocs.io/en/master/usage.html

https://blog.csdn.net/luanpeng825485697/article/details/81036028    好全的資料

https://www.jianshu.com/p/776c188cefa9  從 zookeeper 消費

https://cloud.tencent.com/developer/news/202116  也挺全的資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM