( 五 )、SpringBoot 整合 Kafka
1、简介
本文基于Springboot 2.6.2, 以及目前最新的Kafka 3.0 介绍。
官方文档: https://docs.spring.io/spring-kafka/docs/current/reference/html/#introduction
Github: https://github.com/spring-projects/spring-kafka/tree/main/samples
2、maven 依赖
<!--spring-kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3、yml 配置(producer、consumer的配置),详细的配置参数说明, 可以参考之前的文章:Kafka 配置
server: port: 5001 spring: application: name: springboot-kafka kafka: # Kafka server 地址。多个用 , 隔开 bootstrap-servers: 192.168.248.130:9092 # 消费者配置 consumer: # 消费者客户端Id client-id: ${spring.application.name}-consumer autoCommitInterval: 1000 # 当偏移量不存在时, 自动重置到最近的偏移 auto-offset-reset: latest # 是否自动提交偏移量 enable-auto-commit: true # key 序列化器 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value 序列化器 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 默认消费组Id groupId: defaultConsumerGroup properties: # 消费请求超时时间 request: timeout: ms: 40000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session: timeout: ms: 40000 # 反序列化时,类型转换受信任的包 spring: json: trusted: packages: '*' # 生产者配置 producer: # 生产这的客户端Id client-id: ${spring.application.name}-producer # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) acks: 1 # 批量大小 batch-size: 16384 # 生产端缓冲区大小 buffer-memory: 33554432 # Kafka提供的序列化和反序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: # 提交延时 linger: ms: 0 # 重试次数 retries: 0
4、简单的生产、消费者例子
4.1、新增Topic
在应用程序上下文中定义bean,它可以自动向代理添加主题。为此,您可以NewTopic
@Bean
为每个主题添加一个到应用程序上下文。2.3 版引入了一个新类TopicBuilder
,使创建此类 bean 更加方便。以下示例显示了如何执行此操作:
/** * @Author dw * @ClassName SimpleProducer * @Description * @Date 2021/12/24 10:34 * @Version 1.0 */ @Configuration public class SimpleProducerConfig { /** * 通过配置的方式初始化一个 Topic(my-topic1), 如果不初始化Topic, 在发送到一个不存在的topic时,会自动创建, * 只是副本、分区数量都是1。 * partitions : 分区数量: 10 * replicas: 副本数量 1 个 * * @return
*/ @Bean public NewTopic topic() { return TopicBuilder.name("my-topic1") .partitions(10) .replicas(1) .build(); } }
4.2、创建一个User实例
public class User { private String userName; private Integer age; private char sex; private String des; // 省略 get、set
}
4.3、生产者 producerController
/** * @Author dw * @ClassName ProducerController * @Description * @Date 2021/12/24 10:41 * @Version 1.0 */ @RestController public class ProducerController { @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping("send") public String sendMessage(@RequestBody User user) { kafkaTemplate.send("my-topic1", user); return "success"; } }
4.4、消费者
* @Author dw * @ClassName SimpleConsumer * @Description * @Date 2021/12/24 10:45
* @Version 1.0
*/ @Component public class SimpleConsumer { @KafkaListener(id = "myId", topics = "my-topic1") public void listen(User user) { System.out.println(user.toString()); } }
4.5、测试
访问 localhost:5001/send, 参数自己传入,控制台打印如下:
使用显式主题和分区(以及可选的初始偏移量)配置 POJO 侦听器。以下示例显示了如何执行此操作:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
id:消费组ID;
topics:监听的topic,可监听多个;
topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
上面监听的含义:监听topic2的0号分区,同时监听topic1的0号分区和topic1的1号分区里面offset从8开始的消息。
5、多方法监听
5.1、新增实体 Role 类
/** * @Author dw * @ClassName Role * @Description * @Date 2021/12/26 13:46 * @Version 1.0 */
public class Role { private String roleName; private String roleDes; // ...省略get、set
}
5.2、新增多方法监听消费者类:MultiMethodListener
/** * @Author dw * @ClassName MultiMethodListener * @Description 多方法监听 * @Date 2021/12/26 13:45 * @Version 1.0 */ @Component @KafkaListener(id = "multiGroup", topics = { "my-topic1", "my-topic2" }) public class MultiMethodListener { @KafkaHandler public void user(User user) { System.out.println("multiGroup User Received: " + user.toString()); } @KafkaHandler public void role(Role role) { System.out.println("multiGroup Role Received: " + role.toString()); } @KafkaHandler(isDefault = true) public void unknown(Object object) { System.out.println("multiGroup unknown Received: " + object); } }
5.3、新增 生产者 MultiMethodListenerController 类
/** * @Author dw * @ClassName MultiMethodListenerController * @Description 多方法监听 ProducerController * @Date 2021/12/26 14:21 * @Version 1.0 */ @RestController public class MultiMethodListenerController { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/user") public void sendFoo(@RequestBody User user) { this.template.send("my-topic1", user); } @PostMapping(path = "/send/role") public void sendBar(@RequestBody Role role) { this.template.send("my-topic2", role); } @PostMapping(path = "/send/unknown/{what}") public void sendUnknown(@PathVariable String what) { this.template.send("my-topic2", what); } }
5.4、测试结果:
multiGroup User Received: User{userName='张三', age=29, sex=男, des='爱生活, 爱运动'} myId User Received: User{userName='张三', age=29, sex=男, des='爱生活, 爱运动'} multiGroup Role Received: Role{roleName='管理员', roleDes='角色描述'} multiGroup unknown Received: is-unknown-message
6、Kafka事务提交(executeInTransaction)
6.1、修改yml
spring: kafka: producer: # 开启事务 transaction-id-prefix: tx. acks: all retries: 3 consumer: properties: isolation.level: read_committed
6.2、生产者Controller
/** * @Author dw * @ClassName KafkaTransactionController * @Description kafka 事务中提交 * @Date 2021/12/26 15:05 * @Version 1.0 */ @RestController public class KafkaTransactionController { @Autowired private KafkaTemplate<Object, Object> template; @GetMapping("/kafka/inTransaction") public void sendInTransaction() { // 声明事务:后面报错消息不会发出去
template.executeInTransaction(operations -> { operations.send("my-topic2", "test send in transaction"); throw new RuntimeException("send fail"); }); } @GetMapping("/kafka/notInTransaction") public void sendNotInTransaction() { // 不声明事务:后面报错但前面消息已经发送成功了
template.send("my-topic2", "test send not In transaction"); throw new RuntimeException("fail"); } }
7、Kafka 非阻塞重试
详细请参考官网:https://docs.spring.io/spring-kafka/docs/current/reference/html/#retry-topic
新建消费者RetryListener:
@Component public class RetryListener { private static final Logger logger = LoggerFactory.getLogger(RetryListener.class); /** * @RetryableTopic: 失败重试,attempts:重试次数,backoff: 回避规则 * @param in */ @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 2_000, maxDelay = 10_000, multiplier = 2)) @KafkaListener(id = "retryGroup", topics = "my-topic3") public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.OFFSET) long offset) { this.logger.info("Received: {} from {} @ {}", in, topic, offset); if (in.startsWith("fail")) { throw new RuntimeException("failed"); } } /** * 多次重试失败后,死信的处理 * @param in * @param topic * @param offset */ @DltHandler public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.OFFSET) long offset) { this.logger.info("DLT Received: {} from {} @ {}", in, topic, offset); } }
从 2.0 版开始,@KafkaListener
注释有一个新属性:errorHandler。
您可以使用errorHandler
来提供实现的 bean 名称 KafkaListenerErrorHandler,它有一个子接口 (
ConsumerAwareListenerErrorHandler
),可以通过以下方法访问消费者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
例如,如果您的错误处理程序实现了此接口,则您可以相应地调整偏移量。例如,要重置偏移量以重播失败的消息,您可以执行以下操作:
@Bean public ConsumerAwareListenerErrorHandler listen3ErrorHandler() { return (m, e, c) -> { this.listen3Exception = e; MessageHeaders headers = m.getHeaders(); c.seek(new org.apache.kafka.common.TopicPartition( headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), headers.get(KafkaHeaders.OFFSET, Long.class)); return null; }; }
使用示例:
// 新建一个异常处理器,用@Bean注入
@Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, exception, consumer) -> { System.out.println("消费异常:"+message.getPayload()); return null; }; } // 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler") public void onMessage4(ConsumerRecord<?, ?> record) throws Exception { throw new Exception("简单消费-模拟异常"); } // 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler") public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception { System.out.println("批量消费一次..."); throw new Exception("批量消费-模拟异常"); }
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
@Component public class KafkaConsumer { @Autowired ConsumerFactory consumerFactory; // 消息过滤器
@Bean public ConcurrentKafkaListenerContainerFactory filterContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); // 被过滤的消息将被丢弃
factory.setAckDiscarded(true); // 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> { if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) { return false; } //返回true消息则被过滤
return true; }); return factory; } // 消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory") public void onMessage6(ConsumerRecord<?, ?> record) { System.out.println(record.value()); } }
在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
// 从topic1接收到的消息经过处理后转发到topic2
@KafkaListener(topics = {"topic1"}) @SendTo("topic2") public String onMessage7(ConsumerRecord<?, ?> record) { return record.value()+"-forward message"; }
11、定时启动、停止监听器
默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:
① 禁止监听器自启动;
② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动。
@EnableScheduling @Component public class CronTimer { /** * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean, * 而是会被注册在KafkaListenerEndpointRegistry中, * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean **/ @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory; // 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止KafkaListener自启动
container.setAutoStartup(false); return container; } // 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory") public void onMessage1(ConsumerRecord<?, ?> record){ System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value()); } // 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ") public void startListener() { System.out.println("启动监听器..."); // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) { registry.getListenerContainer("timingConsumer").start(); } //registry.getListenerContainer("timingConsumer").resume();
} // 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ") public void shutDownListener() { System.out.println("关闭监听器..."); registry.getListenerContainer("timingConsumer").pause(); } }