1、使用IDEA新建工程,創建工程 springboot-kafka-producer
工程pom.xml文件添加如下依賴:
<!-- 添加 kafka 依賴 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 添加 gson 依賴 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- 添加 lombok 依賴 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<scope>provided</scope>
</dependency>
2、創建kafka配置類,KafkaProducerConfig
package com.miniooc.kafka.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* kafka配置類
*
* @author 宋陸
* @version 1.0.0
*/
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap.servers}")
private String BOOTSTRAP_SERVERS_CONFIG;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
3、創建kafka生產類,KafkaProducer
package com.miniooc.kafka.producer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.miniooc.kafka.message.MessageBean;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Kafka消息生產類
*
* @author 宋陸
* @version 1.0.0
*/
@Log
@Component
public class KafkaProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.order}")
private String topicOrder;
/**
* 發送消息
*
* @param messageBean 消息實例
*/
public void sendMessage(MessageBean messageBean) {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
Gson gson = builder.create();
// 將消息實例序列化為json格式的字符串
String message = gson.toJson(messageBean);
// 發送消息
kafkaTemplate.send(topicOrder, message);
// 打印消息
log.info("\nminiooc send message:\n" + message);
}
}
4、創建消息實體類,MessageBean
package com.miniooc.kafka.message;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 消息實體類
*
* @author 宋陸
* @version 1.0.0
*/
@Data
public class MessageBean implements Serializable {
/** uuid */
private String uuid;
/** 時間 */
private Date date;
}
5、創建消息控制器類,MessageController,主要是方便測試,非必須
package com.miniooc.kafka.controller;
import com.miniooc.kafka.message.MessageBean;
import com.miniooc.kafka.producer.KafkaProducer;
import lombok.extern.java.Log;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 消息控制器
*
* @author 宋陸
* @version 1.0.0
*/
@Log
@Controller
@RequestMapping("/message")
public class MessageController {
@Resource
private KafkaProducer kafkaProducer;
/**
* 生成消息
*
* @return
*/
@RequestMapping("create")
@ResponseBody
public Map<String, Object> create() {
// 創建消息
MessageBean messageBean = new MessageBean();
String uuid = UUID.randomUUID().toString();
messageBean.setUuid(uuid);
messageBean.setDate(new Date());
// 將消息發送到 kafka
kafkaProducer.sendMessage(messageBean);
Map<String, Object> model = new HashMap<>();
// 返回成功信息
model.put("resultCode", 1);
model.put("resultMsg", "success");
model.put("messageBean", messageBean);
return model;
}
}
6、編輯資源文件
server.port=9526
spring.application.name=kafka-producer
kafka.bootstrap.servers=192.168.88.202:9092
kafka.topic.order=topic-order
kafka.group.id=group-order
7、啟動工程,在瀏覽器中訪問:http://localhost:9526/message/create

工程控制台看到紅框所示內容,說明消息發送成功!
8、使用IDEA新建工程,創建工程 springboot-kafka-consumer
工程pom.xml文件添加如下依賴:
<!-- 添加 kafka 依賴 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 添加 gson 依賴 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- 添加 lombok 依賴 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<scope>provided</scope>
</dependency>
9、創建kafka配置類,KafkaConsumerConfig
package com.miniooc.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* kafka配置類
*
* @author 宋陸
* @version 1.0.0
*/
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}")
private String BOOTSTRAP_SERVERS_CONFIG;
@Value("${kafka.group.id}")
private String GROUP_ID_CONFIG;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自動提交
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(propsMap);
}
}
10、創建kafka消費類,KafkaConsumer
package com.miniooc.kafka.consumer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.miniooc.kafka.message.MessageBean;
import lombok.extern.java.Log;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* Kafka消息消費類
*
* @author 宋陸
* @version 1.0.0
*/
@Log
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload String message) {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
Gson gson = builder.create();
// 將接收到的消息反序列化消息實例
MessageBean messageBean = gson.fromJson(message, new TypeToken() {
}.getType());
// 將消息實例序列化為json格式的字符串
String json = gson.toJson(messageBean);
// 打印消息
log.info("\nminiooc receive message:\n" + json);
}
}
11、創建消息實體類,MessageBean
package com.miniooc.kafka.message;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* 消息實體類
*
* @author 宋陸
* @version 1.0.0
*/
@Data
public class MessageBean implements Serializable {
/** uuid */
private String uuid;
/** 時間 */
private Date date;
}
12、編輯資源文件
server.port=9527
spring.application.name=kafka-producer
kafka.bootstrap.servers=192.168.88.202:9092
kafka.topic.order=topic-order
kafka.group.id=group-order
13、啟動工程

工程控制台看到紅框所示內容,說明消息接收消費成功!
SpringBoot Kafka 整合完成!