SpringBoot集成Kafka序列化与反序列化


maven依赖:

  <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.14.RELEASE</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.11.1</version>
  </dependency>

 

序列化在yml中配置:

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: myGroup
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

 

反序列化:

    @Bean
    public RecordMessageConverter converter() { ObjectMapper mapper = new ObjectMapper(); mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // 下划线转驼峰 return new StringJsonMessageConverter(mapper); }

 

新增topic:

    @Bean
    public NewTopic topic() { return new NewTopic("my-test-topic", 1, (short) 1); }

 

测试生产者消费者:

 
 
@Component
public class KafkaTest {

@Autowired
private KafkaTemplate<String, Object> template;

/**
* 接收topic为database.table的消息并将消息转发到另一个topic:my-test-topic
*/
@KafkaListener(topics = "database.table")
public void process(MysqlSourceEvent<TaskData> record) {
template.send("my-test-topic", record);
}
}
 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM