kafka學習總結014 --- consumer多線程問題


KafkaConsumer是非線程安全的,多線程共享一個KafkaConsumer實例,kafka會有如下異常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

public class MyConsumer5 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class);

    public static void main(String[] args) throws InterruptedException {
        Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
        consumer.subscribe(Collections.singletonList("topic1"));

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }).start();
    }
}

運行結果:

 

使用方法可見:https://blog.csdn.net/clypm/article/details/80618036


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM