Kafka + SpringData + (Avro & String) 【Can't convert value of class java.lang.String】問題解決


【1】需求:Kafka 使用 Avero 反序列化時,同時需要對 String 類型的 JSON數據進行反序列化。AvroConfig的配置信息如下:

 1 /**
 2  * @author zzx
 3  * @creat 2020-03-11-20:23
 4  */
 5 @Configuration
 6 @EnableKafka
 7 public class AvroConfig {    
 8     //生產者配置 略。。。。 主要說消費者問題
 9     //消費者配置 avro 反序列化如下    主要是注入 Bean 的名稱不同
10     @Bean
11     public Map<String, Object> consumerConfigs() {
12         Map<String, Object> props = new HashMap<>();
13         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
14         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
15         //主要區別
16         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
17         props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");
18         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
19         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
20         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
21         return props;
22     }
23     
24     @Bean
25     public ConsumerFactory<String, LatData> consumerFactory() {
26         return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
27                 new AvroDeserializer<>(LatData.class));
28     }
29     
30     @Bean
31     public ConcurrentKafkaListenerContainerFactory<String, LatData> kafkaListenerContainerFactory() {
32         ConcurrentKafkaListenerContainerFactory<String, LatData> factory =
33                 new ConcurrentKafkaListenerContainerFactory<>();
34         factory.setConsumerFactory(consumerFactory());
35         return factory;
36     }
37     
38     //消費者配置 String 反序列化如下
39     @Bean
40     public Map<String, Object> stringConsumerConfigs() {
41         Map<String, Object> props = new HashMap<>();
42         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
43         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
44         //主要區別
45         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
46         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
47         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
48         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
49         return props;
50     }
51     
52     @Bean
53     public DefaultKafkaConsumerFactory stringConsumerFactory() {
54         return new DefaultKafkaConsumerFactory<>(stringConsumerConfigs(), new StringDeserializer(),
55                 new StringDeserializer());
56     }
57 
58     @Bean
59     public ConcurrentKafkaListenerContainerFactory<String, Object> stringKafkaListenerContainerFactory() {
60         ConcurrentKafkaListenerContainerFactory<String, Object> factory =
61                 new ConcurrentKafkaListenerContainerFactory<>();
62         factory.setConsumerFactory(stringConsumerFactory());
63         return factory;
64     }
65     
66 }

【2】消費者監聽:當直接使用  @KafkaListener(topics = {"xx"})時,出現反序列化錯誤問題。

@KafkaListener(topics = {LOADING_TOPIC_NAME})
public void revice(String data) throws Exception {

【3】問題: ERROR c.y.c.exception.BDExceptionHandler - Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer
org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.specific.SpecificRecordBase

ERROR c.y.c.exception.BDExceptionHandler - Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer
org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class com.yunda.common.utils.avro.AvroSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.specific.SpecificRecordBase

【4】問題解決:添加具體需要使用的容器 containerFactory 問題迎刃而解

1 @KafkaListener(topics = {ENTRY_TOPIC_NAME},containerFactory="stringkafkaListenerContainerFactory")
2 public void revice(String data) throws Exception {


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM