springcloud集成kafka


项目名称:布控预警 水平拆分出来的项目,作为一个单独的可以对外提供服务的项目

项目设计:springcloud,可以集成各个不同平台的一个作为对外提供的微服务项目

项目功能:实现各个平台和本平台之间的布控(对人员和摄像头进行和厂商对接可以进行实时抓拍)和预警(厂商抓拍到之后实时通知给本平台)业务。

先说明一下我的kafka在项目中的应用场景:

由于有多个平台对接本平台,中间有一些消息机制,比如摄像头抓拍到某个人员之后会由视频厂商进行通知到本平台,然后本平台根据业务来源和平台来源分别组建不同的消息进行分平台分+业务形式实现消息分发。

实现方式:

1、pom.xml

<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

 

2、application.yml

  cloud:
     stream:
        kafka:
          binder:
            brokers:  127.0.0.1:9092  # kafka服务地址和端口
            zk-nodes: 127.0.0.1:2181  # ZK的集群配置地址和端口

 

3、可以根据表中的topic名称自动初始化topic到kafka,后续如果有新增的平台和业务可以动态根据代码自动生成topic

package com.tianque.xueliang.controlalarm.config;

import com.tianque.xueliang.controlalarm.domain.vo.TopicConfigVo;
import com.tianque.xueliang.controlalarm.service.TopicConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * @Title: TopicConfig
 * @Description: 项目启动动态读取kafka的topic配置并创建topic,创建完成回写表状态已在kafka生成topic
 * @author: sxs@hztianque.com
 * @date: Created in 10:25 2019/8/5
 * @Modifired by:
 */
@Component
public class KafkaTopicConfig {

    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConfig.class);

    @Autowired
    private TopicConfigService topicConfigService;

    @Autowired
    private BinderAwareChannelResolver resolver;

    @PostConstruct
    public void initKafkaTopic() {
        // 获取本地数据表中的topic,只获取kafka中没有的topic
        List<TopicConfigVo> list = topicConfigService.getTopicList();
        try{
            // 循环去生成topic,生成完毕将表中的状态更新为kafka已存在
            for (TopicConfigVo topicConfigVo: list) {
                String topicName = topicConfigVo.getPlatformId() + topicConfigVo.getWorkName();
                // 这行代码是动态去生成topic的,先检查kafka中有没有传入的topic,有就直接返回topic,没有则新建
                MessageChannel messageChannel = resolver.resolveDestination(topicName);
                if (null != messageChannel) {
                    // 更新表中的状态为kafka中已存在改topic
          topicConfigService.updateTopicStatusById(topicConfigVo.getTopicId()); } } }
catch (Exception e) { logger.error("kafka.topic初始化创建失败..", e); } } }
@PostConstruct注解:作用在方法上,表示项目启动自动加载该方法

4、发送消息代码:

package com.tianque.xueliang.controlalarm.stream;

import com.tianque.xueliang.controlalarm.domain.msg.Msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

/**
 * Created with IntelliJ IDEA.
 * @author : sunxuesong
 * Date: 2019/3/12
 * Time: 上午11:16
 * To change this template use File | Settings | File Templates.
 * Description:
 */
@Service
public class WarningService {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private BinderAwareChannelResolver resolver;

    /**
     * 发送预警消息到指定topic,这里的topic是由平台编码+平台名称组成
     * 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
     * @param warnings
     * @param topic
     * @return
     */
    public String sendWarning(final Msg warnings, String topic) {
        logger.info("Sending warnings {}", warnings);

        // 获取预警的topic,然后发送预警消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(warnings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    }

    /**
     * 发送布控消息到指定topic,这里的topic是由平台编码+平台名称组成
     * 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
     * @param msg
     * @param topic
     * @return
     */
    public String sendControl(final Msg msg, String topic) {
        logger.info("Sending controlMsg {}", msg);
        // 获取布控的topic,然后发送布控消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    }
}

说明一下:springcloud和springboot集成kafka的方式不一样,虽然都是通过application.yml去做配置,但是jar包和配置还是有区别的。

如有不清楚欢迎留言..


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM