springcloud集成kafka


項目名稱:布控預警 水平拆分出來的項目,作為一個單獨的可以對外提供服務的項目

項目設計:springcloud,可以集成各個不同平台的一個作為對外提供的微服務項目

項目功能:實現各個平台和本平台之間的布控(對人員和攝像頭進行和廠商對接可以進行實時抓拍)和預警(廠商抓拍到之后實時通知給本平台)業務。

先說明一下我的kafka在項目中的應用場景:

由於有多個平台對接本平台,中間有一些消息機制,比如攝像頭抓拍到某個人員之后會由視頻廠商進行通知到本平台,然后本平台根據業務來源和平台來源分別組建不同的消息進行分平台分+業務形式實現消息分發。

實現方式:

1、pom.xml

<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

 

2、application.yml

  cloud:
     stream:
        kafka:
          binder:
            brokers:  127.0.0.1:9092  # kafka服務地址和端口
            zk-nodes: 127.0.0.1:2181  # ZK的集群配置地址和端口

 

3、可以根據表中的topic名稱自動初始化topic到kafka,后續如果有新增的平台和業務可以動態根據代碼自動生成topic

package com.tianque.xueliang.controlalarm.config;

import com.tianque.xueliang.controlalarm.domain.vo.TopicConfigVo;
import com.tianque.xueliang.controlalarm.service.TopicConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * @Title: TopicConfig
 * @Description: 項目啟動動態讀取kafka的topic配置並創建topic,創建完成回寫表狀態已在kafka生成topic
 * @author: sxs@hztianque.com
 * @date: Created in 10:25 2019/8/5
 * @Modifired by:
 */
@Component
public class KafkaTopicConfig {

    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConfig.class);

    @Autowired
    private TopicConfigService topicConfigService;

    @Autowired
    private BinderAwareChannelResolver resolver;

    @PostConstruct
    public void initKafkaTopic() {
        // 獲取本地數據表中的topic,只獲取kafka中沒有的topic
        List<TopicConfigVo> list = topicConfigService.getTopicList();
        try{
            // 循環去生成topic,生成完畢將表中的狀態更新為kafka已存在
            for (TopicConfigVo topicConfigVo: list) {
                String topicName = topicConfigVo.getPlatformId() + topicConfigVo.getWorkName();
                // 這行代碼是動態去生成topic的,先檢查kafka中有沒有傳入的topic,有就直接返回topic,沒有則新建
                MessageChannel messageChannel = resolver.resolveDestination(topicName);
                if (null != messageChannel) {
                    // 更新表中的狀態為kafka中已存在改topic
          topicConfigService.updateTopicStatusById(topicConfigVo.getTopicId()); } } }
catch (Exception e) { logger.error("kafka.topic初始化創建失敗..", e); } } }
@PostConstruct注解:作用在方法上,表示項目啟動自動加載該方法

4、發送消息代碼:

package com.tianque.xueliang.controlalarm.stream;

import com.tianque.xueliang.controlalarm.domain.msg.Msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

/**
 * Created with IntelliJ IDEA.
 * @author : sunxuesong
 * Date: 2019/3/12
 * Time: 上午11:16
 * To change this template use File | Settings | File Templates.
 * Description:
 */
@Service
public class WarningService {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private BinderAwareChannelResolver resolver;

    /**
     * 發送預警消息到指定topic,這里的topic是由平台編碼+平台名稱組成
     * 若發現kafka中沒有該topic,它會自動創建一個由平台編碼+平台名稱組成的topic
     * @param warnings
     * @param topic
     * @return
     */
    public String sendWarning(final Msg warnings, String topic) {
        logger.info("Sending warnings {}", warnings);

        // 獲取預警的topic,然后發送預警消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(warnings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    }

    /**
     * 發送布控消息到指定topic,這里的topic是由平台編碼+平台名稱組成
     * 若發現kafka中沒有該topic,它會自動創建一個由平台編碼+平台名稱組成的topic
     * @param msg
     * @param topic
     * @return
     */
    public String sendControl(final Msg msg, String topic) {
        logger.info("Sending controlMsg {}", msg);
        // 獲取布控的topic,然后發送布控消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    }
}

說明一下:springcloud和springboot集成kafka的方式不一樣,雖然都是通過application.yml去做配置,但是jar包和配置還是有區別的。

如有不清楚歡迎留言..


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM