文章更新時間:2020/06/08
一、創建Spring boot 工程
創建過程不再描述,創建后的工程結構如下:

POM文件中要加入幾個依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zhbf</groupId> <artifactId>springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入kafka依賴--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 添加 gson 依賴 --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
啟動SpringbootApplication.java,出現下圖界面則說明工程創建好了:

二、創建kafka生產者類,並通過控制器調用
kafka生產者類
/** * Kafka消息生產類 */ @Log @Component public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.topic.user}") private String topicUser;//topic名稱 /** * 發送用戶消息 * * @param user 用戶信息 */ public void sendUserMessage(User user) { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); String message = builder.create().toJson(user); kafkaTemplate.send(topicUser, message); log.info("\n生產消息至Kafka\n" + message); } }
application.yml配置文件

啟動ZK、kafka通訊的服務器broker,並啟動消費者監聽
啟動方式參考上一篇文章,戳這里~
配置一個控制器,即調用kafka生成消息的入口
/** * 測試控制器 * PS:@RestController 注解: 該注解是 @Controller 和 @ResponseBody 注解的合體版 */ @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private User user; @Autowired private KafkaProducer kafkaProducer; @RequestMapping("/createMsg") public void createMsg() { kafkaProducer.sendUserMessage(user); } }
啟動SpringbootApplication,並通過瀏覽器訪問控制器,生成消息



可以看到控制台和消費者窗口都打印了kafka生成的消息。
三、創建kafka消費者類,並通過控制器調用
kafka消費者類
public class KafkaConsumerDemo { @Value("${kafka.topic.user}") private String topicUser;//topic名稱 public void consume() { Properties props = new Properties(); // 必須設置的屬性 props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "group-user"); // 可選設置屬性 //提交方式配置 // 自動提交offset,每1s提交一次(提交后的消息不再消費,避免重復消費問題) props.put("enable.auto.commit", "true");//自動提交offset:true【PS:只有當消息提交后,此消息才不會被再次接受到】 props.put("auto.commit.interval.ms", "1000");//自動提交的間隔 //消費方式配置 /** * earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 * latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 * none: topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 */ props.put("auto.offset.reset", "earliest ");//earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 //拉取消息設置 props.put("max.poll.records", "100 ");//每次poll操作最多拉取多少條消息(一般不主動設置,取默認的就好) //根據上面的配置,新增消費者對象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱topic-user topic consumer.subscribe(Collections.singletonList(topicUser)); while (true) { // 從服務器開始拉取數據 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("成功消費消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); }); } } }
重啟SpringbootApplication,並通過瀏覽器訪問控制器,消費消息


