搭建Kafka集群,參考:
https://www.cnblogs.com/jonban/p/kafka.html
源碼示例如下:
1、新建 Maven 項目 kafka
2、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.java</groupId> <artifactId>kafka</artifactId> <version>1.0.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> </parent> <dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 熱部署 --> <dependency> <groupId>org.springframework</groupId> <artifactId>springloaded</artifactId> <version>1.2.8.RELEASE</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
3、KafkaStarter.java
package com.java; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 主啟動類 * * @author Logan * @version 1.0.0 * @createDate 2019-05-07 * */ @SpringBootApplication public class KafkaStarter { public static void main(String[] args) { SpringApplication.run(KafkaStarter.class, args); } }
4、MessageHandler.java
package com.java.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * * 任務處理器,監聽kafka隊列中的消息,消費並處理 * * @author Logan * @version 1.0.0 * @createDate 2019-05-07 * */ @Component public class MessageHandler { @KafkaListener(topics = { "test-topic" }) public void handle(String message) { System.out.println("[ 處理器開始處理消息 ]" + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); System.out.println("[ 處理器處理消息完成 ]" + System.currentTimeMillis()); } @KafkaListener(topics = { "test-topic" }) public void handle(ConsumerRecord<String, String> record) { System.out.println("[ 處理器開始處理消息 ]" + System.currentTimeMillis()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(record); System.out.println("[ 處理器處理消息完成 ]" + System.currentTimeMillis()); } }
5、SendMessageController.java
package com.java.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * 發送消息類 * * @author Logan * @version 1.0.0 * @createDate 2019-05-07 * */ @RestController public class SendMessageController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private String topic = "test-topic"; @GetMapping("/send") public String send(String params) { System.out.println("[ 收到請求 ]"); kafkaTemplate.send(topic, params); System.out.println("[ 返回響應 ]"); return "您的任務已提交"; } }
6、application.properties
#生產者配置,參見org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer
spring.kafka.producer.bootstrapServers=s1:9092,s2:9092,s3:9092
#消費者配置,參見org.springframework.boot.autoconfigure.kafka.KafkaProperties.Consumer
spring.kafka.consumer.bootstrapServers=s1:9092,s2:9092,s3:9092
spring.kafka.consumer.groupId=kafka-test
spring.kafka.consumer.autoOffsetReset=latest
spring.kafka.consumer.enableAutoCommit=true
7、運行KafkaStarter.java 啟動
瀏覽器輸入:http://127.0.0.1:8080/send?params=Good
可以向主題中發送消息Good,(params參數就是發送的內容)
程序中的消費者會監聽到消息並開始處理
在Kafka消費者控制台可以監聽到同樣的消息
同樣,在Kafka生產者控制台發送消息到主題 test-topic ,
程序中的消費者也會監聽到消息並開始處理。
結論:測試生產者和消費者功能一切正常!
Spring boot 集成Kafka
.