spring mvc + kafka實戰


  1. 下載zk
    • 地址: http://mirrors.hust.edu.cn/apache/zookeeper/stable/
    • 解壓  tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz 
    • cd apache-zookeeper-3.5.8-bin/
    •  mv conf/ zoo_sample.cfg  conf/zoo.cfg

    •  vim zoo.cfg, 配置日志:
      1. tickTime: zookeeper中使用的基本時間單位, 毫秒
      2. dataDir: 內存數據快照的保存目錄;如果沒有自定義Log也使用該目錄
      3. clientPort: 監聽Cli連接的端口號
  2. 下載 kafa : http://kafka.apache.org/downloads.html

 

       a) 解壓:tar -zxvf kafka_2.12-2.5.0.tgz  

       b)   修改 kafka_2.12-2.5.0/config/server.properties

         

      c) 修改 zookeeper.properties

         

 

     d) 啟動kafka: sh bin/kafka-server-start.sh ./config/server.properties 

       

 

     e) 啟動zk ,啟動server : ./zkServer.sh start 

      

     d)創建kafak topic : sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

      

      檢驗是否創建好了topic: 

./kafka-topics.sh --list --zookeeper localhost:2181

       

 

3.  sping mvc 項目

    a) 配置pom.xml文件

  

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.0.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.5.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.101tec</groupId>

            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

 

    b) 創建kafka-producer.xml文件

  

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--基本配置 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- kafka服務地址,可能是集群-->
                <entry key="bootstrap.servers" value="localhost:9092,localhost:9093,localhost:9094" />
                <!-- 有可能導致broker接收到重復的消息,默認值為3-->
                <entry key="retries" value="10" />
                <!-- 每次批量發送消息的數量-->
                <entry key="batch.size" value="1638" />
                <!-- 默認0ms,在異步IO線程被觸發后(任何一個topic,partition滿都可以觸發)-->
                <entry key="linger.ms" value="1" />
                <!--producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常 -->
                <entry key="buffer.memory" value="33554432 " />
                <!-- producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號-->
                <entry key="acks" value="all" />
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 創建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>

    <!-- 創建kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <!--設置對應topic-->
        <property name="defaultTopic" value="test" />
    </bean>
</beans>

  c) 創建 kafka-consumer.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--Kafka服務地址 -->
                <entry key="bootstrap.servers" value="localhost:9092" />
                <!--Consumer的組ID,相同goup.id的consumer屬於同一個組。 -->
                <entry key="group.id" value="order-beta" />
                <!--如果此值設置為true,consumer會周期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之后將會使用此值作為新開始消費的值。 -->
                <entry key="enable.auto.commit" value="true" />
                <!--網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定 -->
                <entry key="session.timeout.ms" value="15000 " />
                <entry key="key.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer"
                       value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!--指定具體監聽類的bean -->
    <bean id="messageListernerConsumerService" class="test.ke.kafka.KafkaConsumerListener" />

    <!-- 創建consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>

    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="test"/>
        <property name="messageListener" ref="messageListernerConsumerService"/>
    </bean>

    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>
</beans>

   d) 配置spring-mvc文件

   

  <!-- 引入kafka配置文件,根據個人文件位置-->
    <import resource="classpath:kafka-producer.xml"/>
    <import resource="classpath:kafka-consumer.xml"/>
<!-- 使用zookeeper注冊中心暴露服務地址 -->
    <dubbo:application name="test-provider"/>
    <dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181"/>

d) 寫接口

 @Resource
    private KafkaTemplate<Integer, String> kafkaTemplate;

@RequestMapping(value = "/hello.do")
    @ResponseBody
    public void hello(){

        kafkaTemplate.sendDefault("test it");

    }

 

消息消費

package test.ke.kafka;

import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

public class KafkaConsumerListener implements MessageListener<Integer, String> {
    @SneakyThrows
    @Override
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {

        Object o = consumerRecord.value();
        System.out.println(String.valueOf(o));
    }
}

此時,起服務在瀏覽器調用 http://localhost:18080/hello.do 則在控制台會打印出

 

---------------------

要簡單驗證kafka是否運行正常,則可以在kafka的bin目錄下執行:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test 

然后新起一個terminal, 執行下面的語句,producer發送文案,則上面的consumer就會拿到對應的結果

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

 同時由於我們剛剛起的spring服務監聽的端口號也是9092, 因此服務也會獲取到該消息:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM