1、依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.11.3</version> <scope>compile</scope> <optional>true</optional> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.3</version> <scope>compile</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
2、配置文件
spring.kafka.bootstrap-servers=192.168.21.120:9092
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.smile.domain
#消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
spring.kafka.listener.missing-topics-fatal=false
logging.level.org.springframework.kafka=ERROR
logging.level.org.apache.kafka=ERROR
properties.spring.json.trusted.packages 配置com.smile.domain 包下的 Message 类们。因为 JsonDeserializer 在反序列化消息时,考虑到安全性,只反序列化成信任的 Message 类。 务必配置
在序列化时,使用了 JsonSerializer 序列化 Message 消息对象,它会在 Kafka 消息 Headers 的 TypeId 上,值为 Message 消息对应的类全名。
在反序列化时,使用了 JsonDeserializer 序列化出 Message 消息对象,它会根据 Kafka 消息 Headers 的 TypeId 的值,反序列化消息内容成该 Message 对象。
3、消息
@Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class Order { private Long orderId; private Long memberId; private String payType; private Date payTime; @Override public String toString() { return "Order{" + "orderId=" + orderId + ", memberId=" + memberId + ", payType='" + payType + '\'' + ", payTime=" + payTime + '}'; } }
生产者
@Component public class OrderProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public SendResult sendOrderMessageSync() throws ExecutionException, InterruptedException { Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build(); return kafkaTemplate.send(ORDER_TOPIC, order).get(); } public ListenableFuture<SendResult<String, Object>> sendMessageAsync() { Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build(); ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(ORDER_TOPIC, order); return result; } }
消费者
@Slf4j @Component public class CommentConsumer { private static final String COMMENT_GROUP = "comment-group"; @KafkaListener(topics = ORDER_TOPIC, groupId = COMMENT_GROUP) public void onMessage(Order order) { log.info("【评论】接受消息内容:{}", order); } } @Slf4j @Component public class DeliveryConsumer { private static final String DELIVERY_GROUP = "delivery-group"; @KafkaListener(topics = ORDER_TOPIC, groupId = DELIVERY_GROUP) public void onMessage(Order order) { log.info("【物流】接收到消息内容:{}", order); } }
单元测试
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = BootKafkaApplication.class) public class ProducerTest { @Autowired private OrderProducer orderProducer; @Test public void testSyncSend() throws ExecutionException, InterruptedException { SendResult sendResult = orderProducer.sendOrderMessageSync(); log.info("result=topic:{} partition:{} offset:{}", sendResult.getRecordMetadata().topic(), sendResult.getRecordMetadata().partition(), sendResult.getRecordMetadata().offset()); Thread.currentThread().join(10000); } @Test public void testAsyncSend() throws InterruptedException { orderProducer.sendMessageAsync().addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { log.info("发送消息异常:{}", ex); } @Override public void onSuccess(SendResult<String, Object> result) { log.info("回调结果 Result = topic:[{}] , partition:[{}], offset:[{}]", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }); Thread.currentThread().join(5000); } }