SpringBoot整合Kafka发送复杂对象


1、依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.3</version>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.3</version>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、配置文件

spring.kafka.bootstrap-servers=192.168.21.120:9092
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.smile.domain
#消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
spring.kafka.listener.missing-topics-fatal=false
logging.level.org.springframework.kafka=ERROR
logging.level.org.apache.kafka=ERROR

properties.spring.json.trusted.packages 配置com.smile.domain 包下的 Message 类们。因为 JsonDeserializer 在反序列化消息时,考虑到安全性,只反序列化成信任的 Message 类。 务必配置

在序列化时,使用了 JsonSerializer 序列化 Message 消息对象,它会在 Kafka 消息 Headers 的 TypeId 上,值为 Message 消息对应的类全名。

在反序列化时,使用了 JsonDeserializer 序列化出 Message 消息对象,它会根据 Kafka 消息 Headers 的 TypeId 的值,反序列化消息内容成该 Message 对象。

3、消息

@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Order {

    private Long orderId;

    private Long memberId;

    private String payType;

    private Date payTime;

    @Override
    public String toString() {
        return "Order{" +
                "orderId=" + orderId +
                ", memberId=" + memberId +
                ", payType='" + payType + '\'' +
                ", payTime=" + payTime +
                '}';
    }
}

生产者

@Component
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public SendResult sendOrderMessageSync() throws ExecutionException, InterruptedException {
        Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build();
        return kafkaTemplate.send(ORDER_TOPIC, order).get();
    }

    public ListenableFuture<SendResult<String, Object>> sendMessageAsync() {
        Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build();
        ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(ORDER_TOPIC, order);
        return result;
    }
}

消费者

@Slf4j
@Component
public class CommentConsumer {

    private static final String COMMENT_GROUP = "comment-group";

    @KafkaListener(topics = ORDER_TOPIC, groupId = COMMENT_GROUP)
    public void onMessage(Order order) {
        log.info("【评论】接受消息内容:{}", order);
    }
}


@Slf4j
@Component
public class DeliveryConsumer {

    private static final String DELIVERY_GROUP = "delivery-group";

    @KafkaListener(topics = ORDER_TOPIC, groupId = DELIVERY_GROUP)
    public void onMessage(Order order) {
        log.info("【物流】接收到消息内容:{}", order);
    }
}

单元测试

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = BootKafkaApplication.class)
public class ProducerTest {

    @Autowired
    private OrderProducer orderProducer;

    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        SendResult sendResult = orderProducer.sendOrderMessageSync();

        log.info("result=topic:{} partition:{} offset:{}",
                sendResult.getRecordMetadata().topic(),
                sendResult.getRecordMetadata().partition(),
                sendResult.getRecordMetadata().offset());
        Thread.currentThread().join(10000);
    }

    @Test
    public void testAsyncSend() throws InterruptedException {
        orderProducer.sendMessageAsync().addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("发送消息异常:{}", ex);
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("回调结果 Result =  topic:[{}] , partition:[{}], offset:[{}]",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
            }
        });
        Thread.currentThread().join(5000);
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM