maven依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.14.RELEASE</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.11.1</version> </dependency>
序列化在yml中配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: myGroup
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
反序列化:
@Bean
public RecordMessageConverter converter() { ObjectMapper mapper = new ObjectMapper(); mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // 下划線轉駝峰 return new StringJsonMessageConverter(mapper); }
新增topic:
@Bean
public NewTopic topic() { return new NewTopic("my-test-topic", 1, (short) 1); }
測試生產者消費者:
@Component
public class KafkaTest {
@Autowired
private KafkaTemplate<String, Object> template;
/**
* 接收topic為database.table的消息並將消息轉發到另一個topic:my-test-topic
*/
@KafkaListener(topics = "database.table")
public void process(MysqlSourceEvent<TaskData> record) {
template.send("my-test-topic", record);
}
}