實現注解式注入kafkaTemplate 生產者和消費者,簡化配置文件
目錄

消費者工廠
/** * 消費者工廠 */ @EnableKafka @Configuration public class KafkaConsumerFactory { @Autowired private ApplicationContext context; /** * 獲取消費者工廠 */ public ConsumerFactory<String, String> consumerFactory(String kafkaBroker) { // 消費者配置信息 Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * 容器配置 * * @param groupId 組名 * @param clazz 消費者監聽器 * @param topicName topicName * @return 容器配置 */ public ContainerProperties containerProperties(String groupId, Class clazz, String topicName) { ContainerProperties containerProperties = new ContainerProperties(topicName); containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD); containerProperties.setGroupId(groupId); containerProperties.setMessageListener(context.getBean(clazz)); return containerProperties; } /** * 獲取消費容器實例 * * @param kafkaBroker kafka server * @param groupId 組名 * @param clazz 消費者監聽器 * @param topicName topicName * @param threadCount 消費線程數 * @return 消息監聽容器 */ public ThreadMessageListenerContainer<String, String> kafkaListenerContainer( String kafkaBroker, String groupId, Class clazz, String topicName, int threadCount) { ThreadMessageListenerContainer<String, String> container = new ThreadMessageListenerContainer<>( consumerFactory(kafkaBroker), containerProperties(groupId, clazz, topicName)); container.setConcurrency(threadCount); container.getContainerProperties().setPollTimeout(3000); return container; } }
生產者工廠
/** * 生產者工廠 */ @EnableKafka @Configuration public class KafkaProducerFactory { @Autowired private ApplicationContext context; /** * 獲取生產者工廠 */ public ProducerFactory<String, String> producerFactory(String kafkaBroker) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } /** * 注冊生產者實例 */ public KafkaTemplate<String, String> kafkaTemplate(String kafkaBroker, String topicName, Class clazz) { KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory(kafkaBroker), Boolean.FALSE); template.setDefaultTopic(topicName); template.setProducerListener((ProducerListener<String, String>) context.getBean(clazz)); return template; } }
初始化監聽器、實例
/** * kafka 初始化 */ @Component public class KafkaInit { /** * kafka server */ @Value("${kafka.servers}") private String kafkaBroker; /** * 組名 */ @Value("${kafka.group}") private String groupId; /** * topicName */ @Value("${kafka.topic}") private String topicName; /** * 消費者工廠 */ @Autowired private KafkaConsumerFactory kafkaConsumerFactory; /** * 生產者工廠 */ @Autowired private KafkaProducerFactory kafkaProducerFactory; /** * 在服務器加載Servlet的時候運行,並且只會被服務器調用一次 */ @PostConstruct public void consumer() { kafkaConsumerFactory.kafkaListenerContainer(kafkaBroker, groupId, TestConsumerListener.class, topicName, 6) .startContainer(); // 加載消費者listener } /** * 獲取生產者實例 */ @Bean("testSender") public KafkaTemplate<String, String> testSender() { return kafkaProducerFactory.kafkaTemplate(kafkaBroker, topicName, DefaultProducerListener.class); } }
用於手動控制容器加載
/** * 繼承消息監聽容器 */ public class ThreadMessageListenerContainer<K, V> extends ConcurrentMessageListenerContainer<K, V> { public ThreadMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) { super(consumerFactory, containerProperties); } public void startContainer() { super.doStart(); } }
生產者監聽器
/** * 默認生產者監聽器 */ @Component public class DefaultProducerListener extends ProducerListenerAdapter { /** * 發送消息成功后調用 */ @Override public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) { super.onSuccess(topic, partition, key, value, recordMetadata); System.out.println("消息發送成功!"); } /** * 發送消息錯誤后調用 */ @Override public void onError(String topic, Integer partition, Object key, Object value, Exception exception) { super.onError(topic, partition, key, value, exception); System.out.println("消息發送失敗!"); } /** * 是否開啟發送監聽 * * @return true開啟,false關閉 */ @Override public boolean isInterestedInSuccess() { return true; } }
消費者監聽器
/** * 消費者監聽器 */ @Component public class TestConsumerListener implements MessageListener<String, String> { /** * 消費消息 * * @param record 消息 */ @Override public void onMessage(ConsumerRecord<String, String> record) { System.out.println(record); } }
消息發送測試
/** * 消息發送 */ @Component public class TestProducerSender { /** * 轉賬隊列發送 */ @Autowired @Qualifier("testSender") private KafkaTemplate kafkaTemplate; /** * 消息發送測試 */ public void sendMessage() { kafkaTemplate.sendDefault("message test"); } }
