kafka 支持發布訂閱


概述

一般消息隊列的是實現是支持兩種模式的,即點對點,還有一種是topic發布訂閱者模式,比如ACTIVEMQ。KAFKA也支持這兩種模式,但是實現的原理不一樣。

KAFKA 的消息被讀取后,並不是馬上刪除,這樣就可以重復讀取。kafka 正式利用這種特性實現發布訂閱者模式。

即在發布消息的時候,發布一個topic,可以使用配置多個消費者來消費,消費者使用分組來實現。比如一個topic ,有兩個分組的消費者訂閱。

那么發布一個消息的時候,兩個分組的消費者可以讀取到此條消息。

實現

配置兩組消費者。

分組1

 <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- 配置kafka的broke -->
                <entry key="bootstrap.servers" value="${kafka.brokerurl}"/>
                <!-- 配置組-->
                <entry key="group.id" value="group1"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="30000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 創建consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>

注意 這個分組的ID 是 group1

分組2

<bean id="consumerProperties2" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- 配置kafka的broke -->
                <entry key="bootstrap.servers" value="${kafka.brokerurl}"/>
                <!-- 配置組-->
                <entry key="group.id" value="group2"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="30000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 創建consumerFactory bean -->
    <bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties2"/>
        </constructor-arg>
    </bean>

這里配置的分組是2 group2 。

我們使用代碼測試發布消息:

IMessageProducer producer= MessageUtil.getProducer();
        LogEntity ent=new LogEntity();
        ent.setId("000000001");
        ent.setIp("192.168.1.1");
        ent.setAction("test");
        producer.send("logMessageQueue", ent);
        return "1";

我們在發布一個消息的時候,兩個分組的消費者都讀取到了這條消息,因此就實現了 發布訂閱者模式。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM