Spring boot 基于 1.5.9版本,如果是基于2.x版本以上,spring-kafka 版本要基于2.1.X版本以上
1. maven 添加依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.9.RELEASE</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>5.2.1</version> </dependency>
2. maven 添加 plugin
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.9.0</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>avsc问价路径</sourceDirectory> <outputDirectory>avro代码生成路径</outputDirectory> </configuration> </execution> </executions> </plugin>
3. application.yml 添加 kafka配置
spring:
kafka:
bootstrap-servers: xxx
producer:
key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
group-id: test
key-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: xxx
security.protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";
template:
default-topic: xxx
4. Java Producer
@Component public class Producer { @Autowired private KafkaTemplate kafkaTemplate; public void send(AvroRecord record) { if (Objects.isNull(record)) { return; } LOGGER.info(record.toString()); kafkaTemplate.sendDefault("key", record); } }
5. Java Consumer
@Component public class Consumer { @KafkaListener(topics = "xxx") public void consume(ConsumerRecord<String, AvroRecord> message) { LOGGER.info("receive message:"); LOGGER.info("topic:" + message.topic()); LOGGER.info("key:" + message.key()); LOGGER.info("value:" + message.value()); } }
