Spring Kafka(二)操作Topic以及Kafka Tool 2的使用


1. 為什么要手動創建Topic

我們從到到尾都沒有創建過"topic.quick.demo"這個Topic,這是因為KafkaTemplate在發送的時候就已經幫我們完成了創建的操作

,所以我們不需要主動創建"topic.quick.demo"這個Topic,而是交由KafkaTemplate去完成。

但這樣也出現了問題,這種情況創建出來的Topic的Partition(分區)數永遠只有1個,也不會有副本(不知道的回爐重造,Kafka部署集群時使用的),

這就導致了我們在后期不能順利擴展。所以這種情況我們需要使用代碼手動去創建Topic。

2. Kafka Tool 2

Kafka Tool 2是一款Kafka的可視化客戶端工具,可以非常方便的查看Topic的隊列信息以及消費者信息以及kafka節點信息。

直接丟下載地址:http://www.kafkatool.com/download.html

打開之后我們看到的界面如下,非常簡潔,雖然這個工具沒有諸如KafkaOffsetMonitor這種監控工具的功能強大,但勝在操作方便,后期會補充一下監控工具的使用。

3. 使用@Bean注解創建Topic

可以說使用SpringBoot創建Topic是一件非常簡單的事情。

首先我們在config包下創建KafkaInitialConfiguration類,注冊一個類型為NewTopic的Bean即可。

@Configuration public class KafkaInitialConfiguration { //創建TopicName為topic.quick.initial的Topic並設置分區數為8以及副本數為1
 @Bean public NewTopic initialTopic() { return new NewTopic("topic.quick.initial",8, (short) 1 ); } }
View Code

接下來啟動一下SpringBoot項目,啟動完成后打開Kafka Tool 2工具,查看一下剛才創建的隊列是否存在

4. 手動編碼創建Topic

同樣在KafkaInitialConfiguration類中編碼,注冊KafkaAdmin和AdminClient兩個Bean

@Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> props = new HashMap<>(); //配置Kafka實例的連接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); KafkaAdmin admin = new KafkaAdmin(props); return admin; } @Bean public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); }
View Code

接下來在DemoTest測試類中編寫測試方法,這里需要注意一點Topic的新增刪除方法都是異步執行的,為了避免在創建過程中程序關閉導致創建失敗,

所以在代碼最后加了一秒的休眠,執行測試方法我們打開Kafka Tool 2會發現多出了一個"topic.quick.initial2"的Topic

 @Autowired private AdminClient adminClient; @Test public void testCreateTopic() throws InterruptedException { NewTopic topic = new NewTopic("topic.quick.initial2", 1, (short) 1); adminClient.createTopics(Arrays.asList(topic)); Thread.sleep(1000); }
View Code

5. 修改Topic分區數量

為什么要更新Topic呢,例如我們上一章創建的“topic.qucik.demo”只有一個分區,后期我們想增加分區數來提高系統吞吐量,

這樣我們就需要修改一下Topic的分區數了。實現也非常簡單,只需要修改在我們剛才編寫的KafkaInitialConfiguration類的initialTopic()方法,緊接着重啟一下項目即可。

修改分區數並不會導致數據的丟失,但是分區數只能增大不能減小。

 @Bean public NewTopic initialTopic() { return new NewTopic("topic.quick.initial",8, (short) 1 ); } //修改后|
 @Bean public NewTopic initialTopic() { return new NewTopic("topic.quick.initial",11, (short) 1 ); }
View Code

6. 查詢Topic信息

測試類中新建一個testSelectTopicInfo方法 ,使用lambda表達式遍歷輸出

 @Test public void testSelectTopicInfo() throws ExecutionException, InterruptedException { DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic.quick.initial")); result.all().get().forEach((k,v)->System.out.println("k: "+k+" ,v: "+v.toString()+"\n")); }
View Code

這個是輸出結果,里面就包含了各個分區的信息等等

k: topic.quick.initial ,v: (name=topic.quick.initial, internal=false, partitions=(partition=0, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=1, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=2, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=3, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=4, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=5, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=6, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=7, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=8, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=9, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)), (partition=10, leader=admin-PC:9092 (id: 0 rack: null), replicas=admin-PC:9092 (id: 0 rack: null), isr=admin-PC:9092 (id: 0 rack: null)))
View Code

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM