这篇文章不讲Kafka相关概念,只是实战。具体了解请参考:
《分布式消息中间件实践》
《Kafka权威指南》
《spring-kafka-reference》spring集成kafka官方文档。
以为SpringBoot集成Kafka
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.niugang</groupId> <artifactId>kafka-spring-boot</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <!-- 继承父包 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath></relativePath> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <!--maven的插件 --> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

application.properties
server.port=8086 ###########################################kafka about config ####################################################### spring.kafka.bootstrap-servers=localhost:9092 ##########################producer about config############################## spring.kafka.producer.acks=1 spring.kafka.producer.batch-size=16384 spring.kafka.producer.retries=0 spring.kafka.producer.buffer-memory=33554432 #spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer.class #spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer.class ##########################consumer about config############################## spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.group-id=kafka_group_2 spring.kafka.consumer.auto-commit-interval=100 #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class

发送消息
package com.niugang.controller; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * * @ClassName: SenderConttoller * @Description:验证发送消息 * @author: niugang * @date: 2018年11月3日 上午9:58:19 * @Copyright: 863263957@qq.com. All rights reserved. * */ @RestController public class SenderConttoller { private Logger logger = LoggerFactory.getLogger(SenderConttoller.class); @Autowired private KafkaTemplate<String, String> template; /** * 同步发送 * * @return * @throws ExecutionException * @throws InterruptedException */ @RequestMapping("syncSendMessage") public String syncSendMessage() { for (int i = 0; i < 100; i++) { try { template.send("kafka-boot", "0", "foo" + i).get(); } catch (InterruptedException e) { logger.error("sync send message fail [{}]", e.getMessage()); e.printStackTrace(); } catch (ExecutionException e) { logger.error("sync send message fail [{}]", e.getMessage()); e.printStackTrace(); } } return "success"; } /** * 异步发送 * * @return */ @RequestMapping("asyncSendMessage") public String sendMessageAsync() { for (int i = 0; i < 100; i++) { /** * <p> * SendResult:如果消息成功写入kafka就会返回一个RecordMetaData对象;result. * getRecordMetadata() 他包含主题信息和分区信息,以及集成在分区里的偏移量。 * 查看RecordMetaData属性字段就知道了 * </p> * */ ListenableFuture<SendResult<String, String>> send = template.send("kafka-boot", "0", "foo" + i); send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { logger.info("async send message success partition [{}]", result.getRecordMetadata().partition()); logger.info("async send message success offest[{}]", result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { logger.error("async send message fail [{}]", ex.getMessage()); } }); } return "success"; } }

消费者
package com.niugang.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * * @ClassName: ConsumerListener * @Description:消费者监听 * @author: niugang * @date: 2018年10月21日 下午2:05:21 * @Copyright: 863263957@qq.com. All rights reserved. * */ @Component public class ConsumerListener { private Logger logger = LoggerFactory.getLogger(ConsumerListener.class); @KafkaListener(id = "foo", topics = "kafka-boot") public void listen1(String foo) { logger.info("message content [{}]", foo); } }

启动类
package com.niugang; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.support.SpringBootServletInitializer; /** * * @ClassName: KafkaApplication * @Description:启动类 * @author: niugang * @date: 2018年10月20日 下午7:55:38 * @Copyright: 863263957@qq.com. All rights reserved. * */ @SpringBootApplication public class KafkaApplication extends SpringBootServletInitializer { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }

源码:https://gitee.com/niugangxy/kafka/tree/master/kafka-spring-boot
微信公众号

