概述
一般消息隊列的是實現是支持兩種模式的,即點對點,還有一種是topic發布訂閱者模式,比如ACTIVEMQ。KAFKA也支持這兩種模式,但是實現的原理不一樣。
KAFKA 的消息被讀取后,並不是馬上刪除,這樣就可以重復讀取。kafka 正式利用這種特性實現發布訂閱者模式。
即在發布消息的時候,發布一個topic,可以使用配置多個消費者來消費,消費者使用分組來實現。比如一個topic ,有兩個分組的消費者訂閱。
那么發布一個消息的時候,兩個分組的消費者可以讀取到此條消息。
實現
配置兩組消費者。
分組1
<bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!-- 配置kafka的broke --> <entry key="bootstrap.servers" value="${kafka.brokerurl}"/> <!-- 配置組--> <entry key="group.id" value="group1"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/> </map> </constructor-arg> </bean> <!-- 創建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean>
注意 這個分組的ID 是 group1
分組2
<bean id="consumerProperties2" class="java.util.HashMap"> <constructor-arg> <map> <!-- 配置kafka的broke --> <entry key="bootstrap.servers" value="${kafka.brokerurl}"/> <!-- 配置組--> <entry key="group.id" value="group2"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/> </map> </constructor-arg> </bean> <!-- 創建consumerFactory bean --> <bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties2"/> </constructor-arg> </bean>
這里配置的分組是2 group2 。
我們使用代碼測試發布消息:
IMessageProducer producer= MessageUtil.getProducer(); LogEntity ent=new LogEntity(); ent.setId("000000001"); ent.setIp("192.168.1.1"); ent.setAction("test"); producer.send("logMessageQueue", ent); return "1";
我們在發布一個消息的時候,兩個分組的消費者都讀取到了這條消息,因此就實現了 發布訂閱者模式。
