由於只是簡單地了解和使用了kafka,所以對底層的東西並不做深入的分析,網上有很多資料介紹 kafka的安裝以及它的配置,包括zookeeper集群的搭建。本文是在環境搭建好的情況下,介紹kafka在Maven項目中該如何使用。
1、kafka的配置文件
如果生產者和消費者都在一個模塊里,那就只需要一個配置文件就行啦,如果在不同模塊里的話就是兩個(當然看你用什么環境,一個環境下一個配置文件,自己根據不同的環境進行配置,開發、測試、生產環境除了連接地址不一樣外,其它可以配置成一樣的)。本文的生產者和消費者處於兩個模塊中,所以配置文件是分開的。圖1是生產者的kafka配置,圖2是消費者的kafka配置。
圖1
圖2
圖1:kafka.metadata.broker.list 應該配所有broker的地址和端口號,本文的IP和端口號純屬虛擬,應根據實際情況配置;
kafka.request.required.acks 是消息的確認模式,1則表示發送消息后悔等待leader確認,若為0則表示不保證消息的到達確認,只管發送,-1則是等待leader收到確認,並進行復制操作后,才返回;
kafka.serializer.class 消息的編碼類型;
kafka.topic 創建的topic,如果服務器設置了不允許自動創建topic,則需要在服務器提前創建好。
圖2:kafka.zookeeper.connect 連接zookeeper集群,消費者消費消息的時候依靠zookeeper來保存狀態信息;
kafka.group.id 消費者所在的group,注意,kafka中消息只能被一個group中的一個消費者消費,所以如果需要消費同一組數據,需要配置多個group來消費;
kafka.zookeeper.session.timeout.ms 連接zookeeper等待連接時間;
kafka.zookeeper.sync.time.ms zookeeper的follower同leader的同步時間;
kafka.auto.commit.interval.ms 消費者自動提交offset到zookeeper的時間;
kafka.auto.offset.reset smallest表示從未被消費的消息最小偏移處開始消費;
kafka.consumer.thread.num 消費者需要配置的線程池大小,與預先配置的topic分區數相等;
kafka.topic 要消費消息的topic。
2、生產者
本文將生產者類放在Spring中管理,在Spring配置文件中配置相應的bean,然后在要發送消息的類中使用@Resource注解注入這個生產類即可調用它定義的send方法。圖3和圖4是生產者在Spring中的配置。
圖3
圖4
圖3:引入步驟1中的kafka配置文件,可以讀取相關的配置。
圖4:根據kafka配置實例化一個生產者,並讀取配置的topic。
Spring中配置好后,需要實現對應的Producer類,在類里面定義一個發送消息的方法,在要發送消息時,注入id為producer的bean即可調用。
3、消費者
消費者消費消息的時候需要連接的是zookeeper集群,同生產者一樣,消費者也放在Spring中進行管理。本文使用的是原生API的High Level Consumer,它不需要關心offset的值,只需消費消息即可。圖5是消費者在Spring中的配置。
注意紅色框線部分,這是設置啟動應用時,Spring會自動去執行ConsumerReceive類里的consume方法,即應用啟動就會自動去消費消息。