SpringBoot整合Kafka


一、准備工作

提前啟動zk,kafka,並且創建一個Topic("Hello-Kafk")

bin/kafka-topics.sh --create --zookeeper 192.168.204.139:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka

確保你的kafka能夠訪問,如果訪問不了,需要打開外網訪問。
config/server.properties

advertised.listeners=PLAINTEXT://192.168.239.128:9092

Maven 依賴

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

 

二、項目結構
為了更加體現實際開發需求,一般生產者都是在調用某些接口的服務處理完邏輯之后然后往kafka里面扔數據,然后有一個消費者不停的監控這個Topic,然后處理數據,所以這里把生產者作為一個接口,消費者放到kafka這個目錄下,注意@Component注解,不然掃描不到@KafkaListener

 

 

 

三、具體實現代碼
SpringBoot配置文件 application.properties

#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring.kafka.bootstrap-servers=192.168.204.139:9092

#=============== provider  =======================
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默認消費者group id
#spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生產者

package cn.kafka.demo;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Properties;

/**
 * 測試kafka生產者
 */
@RestController
@RequestMapping("/kafka")
public class TestKafkaProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(String msg){
        kafkaTemplate.send("Hello-Kafka",msg);
        return "success";
    }
}

 

消費者
這里的消費者會監聽這個主題,有消息就會執行,不需要進行while(true)

 

package cn.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消費者測試
 */
@Component
public class TestConsumer {

    @KafkaListener(topics = "Hello-Kafka")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {
        System.err.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

 

項目啟動類

 

package cn.kafka.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestApplication{

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
}

四、測試
運行項目,執行:http://localhost:8080/kafka/send?msg=hello

控制台輸出:

topic = test_topic, offset = 19, value = hello 

 

為了體現消費者不止執行一次就結束,再調用一次接口:
http://localhost:8080/kafka/send?msg=kafka

topic = test_topic, offset = 20, value = kafka 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM