【1】需求:Kafka 使用 Avero 反序列化時,同時需要對 String 類型的 JSON數據進行反序列化。AvroConfig的配置信息如下:
1 /** 2 * @author zzx 3 * @creat 2020-03-11-20:23 4 */ 5 @Configuration 6 @EnableKafka 7 public class AvroConfig { 8 //生產者配置 略。。。。 主要說消費者問題 9 //消費者配置 avro 反序列化如下 主要是注入 Bean 的名稱不同 10 @Bean 11 public Map<String, Object> consumerConfigs() { 12 Map<String, Object> props = new HashMap<>(); 13 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 14 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 15 //主要區別 16 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class); 17 props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro"); 18 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); 19 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); 20 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 21 return props; 22 } 23 24 @Bean 25 public ConsumerFactory<String, LatData> consumerFactory() { 26 return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 27 new AvroDeserializer<>(LatData.class)); 28 } 29 30 @Bean 31 public ConcurrentKafkaListenerContainerFactory<String, LatData> kafkaListenerContainerFactory() { 32 ConcurrentKafkaListenerContainerFactory<String, LatData> factory = 33 new ConcurrentKafkaListenerContainerFactory<>(); 34 factory.setConsumerFactory(consumerFactory()); 35 return factory; 36 } 37 38 //消費者配置 String 反序列化如下 39 @Bean 40 public Map<String, Object> stringConsumerConfigs() { 41 Map<String, Object> props = new HashMap<>(); 42 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 44 //主要區別 45 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 46 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); 47 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); 48 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 49 return props; 50 } 51 52 @Bean 53 public DefaultKafkaConsumerFactory stringConsumerFactory() { 54 return new DefaultKafkaConsumerFactory<>(stringConsumerConfigs(), new StringDeserializer(), 55 new StringDeserializer()); 56 } 57 58 @Bean 59 public ConcurrentKafkaListenerContainerFactory<String, Object> stringKafkaListenerContainerFactory() { 60 ConcurrentKafkaListenerContainerFactory<String, Object> factory = 61 new ConcurrentKafkaListenerContainerFactory<>(); 62 factory.setConsumerFactory(stringConsumerFactory()); 63 return factory; 64 } 65 66 }
【2】消費者監聽:當直接使用 @KafkaListener(topics = {"xx"})時,出現反序列化錯誤問題。
@KafkaListener(topics = {LOADING_TOPIC_NAME}) public void revice(String data) throws Exception {
【3】問題: ERROR c.y.c.exception.BDExceptionHandler - Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer
org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.specific.SpecificRecordBase
ERROR c.y.c.exception.BDExceptionHandler - Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.specific.SpecificRecordBase
【4】問題解決:添加具體需要使用的容器 containerFactory 問題迎刃而解
1 @KafkaListener(topics = {ENTRY_TOPIC_NAME},containerFactory="stringkafkaListenerContainerFactory") 2 public void revice(String data) throws Exception {