spring+Kafka+springmvc Demo


一、下載(官網下載:http://kafka.apache.org/downloads.html)

任意下載一個zip

二、制作demo示例,spring+springmvc+Kafka
開發前請先安裝zookeeper,傳送門                                              

1. 啟動kafka服務:安裝目錄下.\bin\windows\kafka-server-start.bat .\config\server.properties      

  啟動zookeeper:進入zk的安裝文件夾bin目錄下雙擊zkServer.bat文件(因為kafka的topic需要在zk注冊中心注冊)

 

2.創建topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 

3.引入maven依賴,其中引入了多余的依賴,直接從筆者的demo項目中復制過來

 

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.3.10.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.14.RELEASE</version>
</dependency>

<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.22</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.1.6.RELEASE</version>
</dependency>

<dependency>
<groupId>com.github.miemiedev</groupId>
<artifactId>mybatis-paginator</artifactId>
<version>1.2.15</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>carhouse-test</groupId>
<artifactId>carhouse-test-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.101tec</groupId>

<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>

 

4.producer配置(kafka-producer.xml)

<!--基本配置 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!-- kafka服務地址,可能是集群-->
<entry key="bootstrap.servers" value="localhost:9092,localhost:9093,localhost:9094" />
<!-- 有可能導致broker接收到重復的消息,默認值為3-->
<entry key="retries" value="10" />
<!-- 每次批量發送消息的數量-->
<entry key="batch.size" value="1638" />
<!-- 默認0ms,在異步IO線程被觸發后(任何一個topic,partition滿都可以觸發)-->
<entry key="linger.ms" value="1" />
<!--producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常 -->
<entry key="buffer.memory" value="33554432 " />
<!-- producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號-->
<entry key="acks" value="all" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>

<!-- 創建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>

<!-- 創建kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<!--設置對應topic-->
<property name="defaultTopic" value="test" />
</bean>

 

5.consumer配置(kafka-consumer.xml)

<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--Kafka服務地址 -->
<entry key="bootstrap.servers" value="localhost:9092" />
<!--Consumer的組ID,相同goup.id的consumer屬於同一個組。 -->
<entry key="group.id" value="order-beta" />
<!--如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。 -->
<entry key="enable.auto.commit" value="true" />
<!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 -->
<entry key="session.timeout.ms" value="15000 " />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>

<!--指定具體監聽類的bean -->
<bean id="messageListernerConsumerService" class="mq.kafka.KafkaConsumerListener" />

<!-- 創建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>

<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="test"/>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>

<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>

 

5.在spring-mvc.xml的額外配置

<!-- 使用zookeeper注冊中心暴露服務地址 -->
<dubbo:application name="test-provider"/>
<dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181"/>
<!-- 引入kafka配置文件,根據個人文件位置-->
<import resource="classpath:./kafka/kafka-producer.xml"/>
<import resource="classpath:./kafka/kafka-consumer.xml"/>

6.實際使用

最簡單的一條消息發送

@Resource
private KafkaTemplate<Integer, String> kafkaTemplate;

@RequestMapping(value = "/hello.do")
public void hello(){
  kafkaTemplate.sendDefault("test it");
}

 

消息消費

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

public class KafkaConsumerListener implements MessageListener<Integer, String> {
 
  
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {     Object o = consumerRecord.value();     System.out.println(String.valueOf(o));   } }

 

最后會在控制台輸出“test it”

 

作者:紫荊王朝
來源:CSDN
原文:https://blog.csdn.net/wu18296184782/article/details/80164190


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM