KafkaConsumer是非線程安全的,多線程共享一個KafkaConsumer實例,kafka會有如下異常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
public class MyConsumer5 { private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class); public static void main(String[] args) throws InterruptedException { Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2"); consumer.subscribe(Collections.singletonList("topic1")); new Thread(() -> { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : consumerRecords) { LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value()); } } }).start(); new Thread(() -> { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : consumerRecords) { LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value()); } } }).start(); } }
運行結果: