springboot整合kafka應用


1、kafka在消息傳遞的使用非常普遍,相對於activemq來說kafka的分布式管理和使用更加靈活。

2、activemq的搭建和使用可以參考:

  activemq搭建和springmvc的整合:http://www.cnblogs.com/ll409546297/p/6898155.html

  springboot和springboot的整合:http://www.cnblogs.com/ll409546297/p/7805072.html

3、kafka的搭建:

  http://www.cnblogs.com/ll409546297/p/7810302.html

4、下面介紹kafka和springboot的整合

  1)目錄結構

  

  2)需要的基礎包:pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.troy</groupId>
    <artifactId>springbootkafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>1.5.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.0.RELEASE</version>
        </dependency>
    </dependencies>

</project>

  3)基本配置:application.yml

server:
  port: 8090
spring:
  kafka:
    bootstrap-servers: 192.168.5.10:9092 #kafka的訪問地址,多個用","隔開
    consumer:
      enable-auto-commit: true
      group-id: kafka #群組ID
      auto-offset-reset: earliest #啟東時接收沒有接收到的數據

  如果存在集群的話,配置如下

server:
  port: 8090
spring:
  kafka:
    consumer:
      enable-auto-commit: true
      group-id: kafka
      auto-offset-reset: earliest
      bootstrap-servers: 192.168.5.11:9092
    producer:
      bootstrap-servers: 192.168.5.10:9092

   4)生產者:KafkaProducer.class

@Component //這個必須加入容器不然,不會執行
@EnableScheduling //這里是為了測試加入定時調度
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Scheduled(cron = "00/30 * * * * ?")
    public void send(){
        System.out.println("send data");
        kafkaTemplate.send("topic","kafka data");
        //發送方式很多種可以自己研究一下
    }
}

  5)消費者:KafkaCustomer.class

@Component //同樣這里是必須的
public class KafkaCustomer {

    @KafkaListener(topics = {"topic"})
    public void receive(String message){
        System.out.println("topic========topic");
        System.out.println(message);
    }
}

  6)測試結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM