每天學習一點點 編程PDF電子書、視頻教程免費下載:
http://www.shitanlife.com/code
創建一個kafka-producer-master的maven工程。整個項目結構如下:

Maven的依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gzh.kafka.producer</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-producer-master</name>
<description>demo project for kafka producer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
使用application.properties配置應用程序
當然,根據個人喜好,你也可以使用application.yml屬性文件重寫配置。Spring Boot會嘗試根據pom.xml文件中指定的依賴關系自動配置應用程序,並設置合理的默認值。
server.port=8000 spring.application.name=kafka-producer #kafka configuration spring.kafka.producer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #topic kafka.app.topic.foo=test20180430
在上面的配置中,我給生產者分配的端口號是8000,服務器有3台,采用先前window環境搭建zookeeper,kafka集群 中配置的服務器。想了解關於kafka生產者相關的更多配置的話,可以閱讀關於Spring Boot Kafka Properties的配置信息。
使用Spring Boot發送Spring Kafka消息
SpringKafka提供了使用Producer的KafkaTemplate類發送消息,並提供將數據發送到Kafka主題的高級操作。 提供異步和同步方法,異步方法返回Future。Spring Boot根據application.properties屬性文件中配置的屬性自動配置並初始化KafkaTemplate。為了方便測試發送消息,使用了Spring的定時任務,在類上使用@EnableScheduling 注解開啟定時任務,通過@Scheduled注解指定發送消息規則。
package com.gzh.kafka.producer.component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Component
@EnableScheduling
public class KafkaMessageProducer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.app.topic.foo}")
private String topic;
@Scheduled(cron = "00/5 * * * * ?")
public void send() {
String message = "Hello World---" + System.currentTimeMillis();
LOG.info("topic="+topic+",message="+message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(success -> LOG.info("KafkaMessageProducer 發送消息成功!"),
fail -> LOG.error("KafkaMessageProducer 發送消息失敗!"));
}
}
創建消息生產者啟動類
package com.gzh.kafka.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties
public class KafkaProducerApplication{
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
至此,Spring Boot整合Spring Kafka消息生產者應用已經整合完畢。啟動zookeeper、kafka各個服務器。啟動生產者應用,查看消息生產者應用控制台日志,如下圖說明整合OK。

當然在創建消息生產者類時,我們可以更加靈活,可以不使用定時任務,通過界面請求的方式,發送我們想要發送的內容。簡單案例如下:
- 消息發送者類
package com.gzh.kafka.producer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
@Service
public class KafkaMessageSendService {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSendService.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.app.topic.foo}")
private String topic;
public void send(String message){
LOG.info("topic="+topic+",message="+message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(success -> LOG.info("KafkaMessageProducer 發送消息成功!"),
fail -> LOG.error("KafkaMessageProducer 發送消息失敗!"));
}
}
- 界面請求處理controller類
package com.gzh.kafka.producer.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.gzh.kafka.producer.service.KafkaMessageSendService;
@RestController
@RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE)
public class KafkaMessageSendController {
@Autowired
private KafkaMessageSendService kafkaMessageSendService;
@RequestMapping(value="/sendMessage",method=RequestMethod.POST)
public String send(@RequestParam(required=true) String message){
try {
kafkaMessageSendService.send(message);
} catch (Exception e) {
return "send failed.";
}
return message;
}
}
- 通過Swagger訪問測試Controller服務請求

Spring Kafka整合Spring Boot創建消費者客戶端案例:
創建一個kafka-consumer-master的maven工程。整個項目結構如下:

Maven的依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gzh.kafka.consumer</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-consumer-master</name>
<description>demo project for kafka consumer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-kafka.version>1.3.4.RELEASE</spring-kafka.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
注意,這是使用Spring-Kafka時一定要注意版本問題,否則會報各種奇葩錯誤。Spring官方網站上給出了SpringKafka和kafka-client版本(它的版本號要和kafka服務器的版本保持一致)的對應關系:

使用application.properties配置應用程序
Spring Boot會嘗試根據pom.xml文件中指定的依賴關系自動配置應用程序,並設置合理的默認值。
server.port=8001 spring.application.name=kafka-consumer #kafka configuration #指定消息被消費之后自動提交偏移量,以便下次繼續消費 spring.kafka.consumer.enable-auto-commit=true #指定消息組 spring.kafka.consumer.group-id=guan #指定kafka服務器地址 spring.kafka.consumer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094 #指定從最近地方開始消費(earliest) spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #topic kafka.app.topic.foo=test20180430
在上面的配置中,我給生產者分配的端口號是8000,服務器有3台,采用先前window環境搭建zookeeper,kafka集群 中配置的服務器。想了解關於kafka生產者相關的更多配置的話,可以閱讀關於Spring Boot Kafka Properties的配置信息。
使用Spring Boot消費Spring Kafka消息
通過使用@KafkaListener來注解一個方法Spring Kafka會自動創建一個消息監聽器容器。使用該注解,並指定要消費的topic(也可以指定消費組以及分區號,支持正則表達式匹配),這樣,消費者一旦啟動,就會監聽kafka服務器上的topic,實時進行消費消息。
package com.gzh.kafka.consumer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);
@KafkaListener(topics={"${kafka.app.topic.foo}"})
public void receive(@Payload String message, @Headers MessageHeaders headers){
LOG.info("KafkaMessageConsumer 接收到消息:"+message);
headers.keySet().forEach(key->LOG.info("{}: {}",key,headers.get(key)));
}
}
創建消息消費者啟動類
package com.gzh.kafka.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
消費者應用已經完成,接下來讓我們驗證Spring Kafka消息發送和接收效果。先依次啟動zookeeper、kafka服務器,然后在啟動生產者(kafka-producer-master)應用,再啟動消費者(kafka-consumer-master)應用,然后觀察生產者和消費者啟動類日志:



