Spring KafkaTemplate 注解式實現 工廠模式


實現注解式注入kafkaTemplate 生產者和消費者,簡化配置文件

 

目錄

 

 

消費者工廠
/**
 * 消費者工廠
 */
@EnableKafka
@Configuration
public class KafkaConsumerFactory {

    @Autowired
    private ApplicationContext context;

    /**
     * 獲取消費者工廠
     */
    public ConsumerFactory<String, String> consumerFactory(String kafkaBroker) {

        // 消費者配置信息
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * 容器配置
     *
     * @param groupId   組名
     * @param clazz     消費者監聽器
     * @param topicName topicName
     * @return 容器配置
     */
    public ContainerProperties containerProperties(String groupId, Class clazz, String topicName) {

        ContainerProperties containerProperties = new ContainerProperties(topicName);
        containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        containerProperties.setGroupId(groupId);
        containerProperties.setMessageListener(context.getBean(clazz));
        return containerProperties;
    }

    /**
     * 獲取消費容器實例
     *
     * @param kafkaBroker kafka server
     * @param groupId     組名
     * @param clazz       消費者監聽器
     * @param topicName   topicName
     * @param threadCount 消費線程數
     * @return 消息監聽容器
     */
    public ThreadMessageListenerContainer<String, String> kafkaListenerContainer(
            String kafkaBroker, String groupId, Class clazz, String topicName, int threadCount) {

        ThreadMessageListenerContainer<String, String> container
                = new ThreadMessageListenerContainer<>(
                consumerFactory(kafkaBroker), containerProperties(groupId, clazz, topicName));
        container.setConcurrency(threadCount);
        container.getContainerProperties().setPollTimeout(3000);
        return container;
    }

}
 
        
生產者工廠
/**
 * 生產者工廠
 */
@EnableKafka
@Configuration
public class KafkaProducerFactory {

    @Autowired
    private ApplicationContext context;


    /**
     * 獲取生產者工廠
     */
    public ProducerFactory<String, String> producerFactory(String kafkaBroker) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }

    /**
     * 注冊生產者實例
     */
    public KafkaTemplate<String, String> kafkaTemplate(String kafkaBroker, String topicName, Class clazz) {

        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory(kafkaBroker), Boolean.FALSE);
        template.setDefaultTopic(topicName);
        template.setProducerListener((ProducerListener<String, String>) context.getBean(clazz));
        return template;
    }
}

 

初始化監聽器、實例

/**
 * kafka 初始化
 */
@Component
public class KafkaInit {

    /**
     * kafka server
     */
    @Value("${kafka.servers}")
    private String kafkaBroker;

    /**
     * 組名
     */
    @Value("${kafka.group}")
    private String groupId;

    /**
     * topicName
     */
    @Value("${kafka.topic}")
    private String topicName;

    /**
     * 消費者工廠
     */
    @Autowired
    private KafkaConsumerFactory kafkaConsumerFactory;

    /**
     * 生產者工廠
     */
    @Autowired
    private KafkaProducerFactory kafkaProducerFactory;


    /**
     * 在服務器加載Servlet的時候運行,並且只會被服務器調用一次
     */
    @PostConstruct
    public void consumer() {

        kafkaConsumerFactory.kafkaListenerContainer(kafkaBroker, groupId, TestConsumerListener.class, topicName, 6)
                .startContainer();
        // 加載消費者listener

    }

    /**
     * 獲取生產者實例
     */
    @Bean("testSender")
    public KafkaTemplate<String, String> testSender() {

        return kafkaProducerFactory.kafkaTemplate(kafkaBroker, topicName, DefaultProducerListener.class);
    }

}

 

用於手動控制容器加載

/**
 * 繼承消息監聽容器
 */
public class ThreadMessageListenerContainer<K, V> extends ConcurrentMessageListenerContainer<K, V> {


    public ThreadMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        super(consumerFactory, containerProperties);
    }

    public void startContainer() {
        super.doStart();
    }
}

 

生產者監聽器
/**
 * 默認生產者監聽器
 */
@Component
public class DefaultProducerListener extends ProducerListenerAdapter {

    /**
     * 發送消息成功后調用
     */
    @Override
    public void onSuccess(String topic, Integer partition, Object key,
                          Object value, RecordMetadata recordMetadata) {
        super.onSuccess(topic, partition, key, value, recordMetadata);
        System.out.println("消息發送成功!");
    }

    /**
     * 發送消息錯誤后調用
     */
    @Override
    public void onError(String topic, Integer partition, Object key,
                        Object value, Exception exception) {
        super.onError(topic, partition, key, value, exception);
        System.out.println("消息發送失敗!");
    }

    /**
     * 是否開啟發送監聽
     *
     * @return true開啟,false關閉
     */
    @Override
    public boolean isInterestedInSuccess() {
        return true;
    }

}

 

消費者監聽器
/**
 * 消費者監聽器
 */
@Component
public class TestConsumerListener implements MessageListener<String, String> {


    /**
     * 消費消息
     *
     * @param record 消息
     */
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {

        System.out.println(record);
    }
}

 

消息發送測試
/**
 * 消息發送
 */
@Component
public class TestProducerSender {

    /**
     * 轉賬隊列發送
     */
    @Autowired
    @Qualifier("testSender")
    private KafkaTemplate kafkaTemplate;

    /**
     * 消息發送測試
     */
    public void sendMessage() {

        kafkaTemplate.sendDefault("message test");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM