spring整合kafka簡單單元測試


kafka簡介: 

Kafka 是一種分布式的,基於發布 / 訂閱的消息系統。主要設計目標如下:

  • 以時間復雜度為 O(1) 的方式提供消息持久化能力,即使對 TB 級以上數據也能保證常數時間復雜度的訪問性能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條以上消息的傳輸。
  • 支持 Kafka Server 間的消息分區,及分布式消費,同時保證每個 Partition 內的消息順序傳輸。
  • 同時支持離線數據處理和實時數據處理。
  • Scale out:支持在線水平擴展。

 

一:配置文件

  1.1:pom.xml中加入kafka依賴

<!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.4.RELEASE</version>
        </dependency>

 

  1.2:導入生產者的xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">


    <!--參數配置 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- kafka服務地址,可能是集群 value="localhost:9092,localhost:9093,localhost:9094"-->
                <entry key="bootstrap.servers" value="ip地址:9092" />
    <!--有可能導致broker接收到重復的消息  默認是0-->
                <entry key="retries" value="3" />
                <!--acks=0, 表示生產者在成功寫入消息之前不會等待任何來自服務器的響應-->
                <!--acks=1, 表示只要集群的leader分區副本接收到了消息,就會向生產者發送一個成功響應的ack-->
                <!--acks=all, 表示只要所有參與復制的1節點(ISR列表的副本)全部收到消息時,生產者才會接收到來自服務器的響應-->
                <entry key="ack" value="1" />
                <!--producer可以用來緩存數據的內存大小,如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常-->
                <entry key="buffer.memory" value="33554432" />

                <entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.StringSerializer" />

                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>



    <!-- 創建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 創建kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate"
          class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <!--設置對應topic   test -是后台已經創建好的主題-->
        <property name="defaultTopic" value="test" />
    </bean>

</beans>

  

  1.3:導入消費者xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

    <!--消息監聽器-->
    <bean id="messageListenerContainer"
          class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>

    <!-- 記得修改主題 -->
    <bean id="containerProperties"
          class="org.springframework.kafka.listener.ContainerProperties">
        <!-- 構造函數 就是 主題的參數值 -->
        <constructor-arg value="test" />
        <!-- 自定義個消息監聽器 -->
        <property name="messageListener" ref="myListnener" />
        <!--手工確定-->
        <property name="ackMode" value="MANUAL"></property>
    </bean>

    <!-- -消息監聽器 -->
    <bean id="myListnener" class="com.zcb.kafka.MyMessageListener"></bean>

    <!-- 創建consumerFactory bean -->
    <bean id="consumerFactory"
          class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>

    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--Kafka服務地址 -->
                <entry key="bootstrap.servers" value="ip地址:9092" />
                <!--Consumer的組ID,相同group.id的consumer屬於同一個組。 -->
                <entry key="group.id" value="test-consumer-group" />
                <!--如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。 -->
                <entry key="enable.auto.commit" value="false" />
                <!--當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費-->
                <entry key="auto-offset-reset" value="earliest"></entry>
                <!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 -->
                <entry key="session.timeout.ms" value="15000" />

                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />

                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>


</beans>

  

  1.4:添加監聽器實體類,以及用於啟動消費者的實體類

//消費者監聽器
public class MyMessageListener implements AcknowledgingMessageListener<String,String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        String key = data.key();
        String value = data.value();
        if(key.equals("request")){
            String s = JSON.parseObject(value, String.class);
            System.out.println(s);
            //確定接收
            acknowledgment.acknowledge();
        }
    }
}
//啟動消費者
public class KafKaMain {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("spring-kafka-consumer.xml");
    }
}

 

 

   二:啟動生產者消費者

    2.1:測試類

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-kafka-producer.xml")
public class MyTest {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Test
    public void testKafka(){
        kafkaTemplate.sendDefault("request", JSON.toJSONString("hello,Word!"));
    }

}

    

    2.2:效果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM