在網上碰到的問題,想了下使用現有的API還是可以實現的。 首先,需要引入Kafka服務器端代碼,比如加入Kafka 1.0.0依賴: Maven <dependency> <groupId>org.apache.kafka</groupId> < ...
前段時間在Kafka QQ群中有人問及此事 關於Java consumer如何動態修改topic訂閱的問題。仔細一想才發現這的確是個好問題,因為如果簡單地在另一個線程中直接持有consumer實例然后調用subscribe進行修改,consumer端必然會拋出異常ConcurrentModificationException:KafkaConsumer is not safe for multi ...
2017-06-17 16:55 6 9663 推薦指數:
在網上碰到的問題,想了下使用現有的API還是可以實現的。 首先,需要引入Kafka服務器端代碼,比如加入Kafka 1.0.0依賴: Maven <dependency> <groupId>org.apache.kafka</groupId> < ...
之前寫過如何用服務器端的API代碼來獲取訂閱某topic的所有consumer group,參見這里。使用服務器端的API需要用到kafka.admin.AdminClient類,但是這個類在0.11.0.0版本已經被標記為不推薦使用了,故目前最合適的方式還是通過客戶端API ...
1,查看kafka topic列表,使用--list參數 2,查看kafka特定topic的詳情,使用--topic與--describe參數 列出了test_topic的parition數量、replica因子以及每個partition的leader ...
最近工作中遇到需要使用kafka的場景,測試消費程序啟動后,要莫名的過幾十秒乃至幾分鍾才能成功獲取到到topic的partition和offset,而后開始消費數據,於是學習了一下查看kafka broker里topic和consumer group狀態的相關命令,這里記錄一下。 命令參考 ...
最近工作中遇到需要使用kafka的場景,測試消費程序啟動后,要莫名的過幾十秒乃至幾分鍾才能成功獲取到到topic的partition和offset,而后開始消費數據,於是學習了一下查看kafka broker里topic和consumer group狀態的相關命令,這里記錄一下。 命令參考 ...
1.獲取所有topic package com.example.demo; import java.io.IOException; import java.util.List; import org.apache.zookeeper.KeeperException; import ...
Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取 ...
Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取 ...