RabbitMQ實現消息的發送和數據同步


引言

    最近參與了一個智慧城市綜合管理平台的項目,主要核心業務就是針對視頻監控管理統計分析城市車輛的相關信息,涉及到幾個平台以及和其他公司合作開發業務的場景,需要對車輛數據進行同步和共享,中間就用到了RabbitMQ的消息中間件,對車輛的大數據進行發送監控和不同系統間的信息同步,下邊就簡單梳理講解一下RabbitMQ發送消息的過程和業務。具體的RabbitMQ服務端和客戶端的配置再此就不講解了

  1:車輛大數據的 BMS同步車輛大數據消息實體CLS_VO_Ignite_Message 

package com.tiandy.easy7.core.vo;

import java.util.List;

/**
 * 同步車輛大數據消息結構
 */
public class CLS_VO_Ignite_Message {

    private String source;                          //標識消息來源於BMS系統
    private String type;                            //user-用戶
    private String operate;                         //add-新增,del-刪除,update-更新
    private CLS_VO_Ignite_User user;                //用戶信息
    private List<CLS_VO_Ignite_Role> role_list;     //用戶的權限信息
    private List<CLS_VO_Ignite_Tollgate> tollgate_list;                 //卡口對應角色清單
    private List<CLS_VO_Ignite_Camera> camera_list;                   //相機對應角色清單
    private List<CLS_VO_AreaParam> area_list;
    private List<CLS_VO_GisCoordinates> gis_list;   //經緯度集合
    private CLS_VO_DomainInfoEx domainInfo;
    public CLS_VO_Ignite_Message(){}

    public CLS_VO_Ignite_Message(String source, String type, String operate, CLS_VO_Ignite_User user, List<CLS_VO_Ignite_Role> role_list, List<CLS_VO_Ignite_Tollgate> tollgate_list, List<CLS_VO_Ignite_Camera> camera_list,List<CLS_VO_AreaParam> area_list) {
        this.source = source;
        this.type = type;
        this.operate = operate;
        this.user = user;
        this.role_list = role_list;
        this.tollgate_list = tollgate_list;
        this.camera_list = camera_list;
        this.area_list = area_list;
    }

    public CLS_VO_Ignite_Message(String source,String type, String operate, List<CLS_VO_GisCoordinates> gis_list) {
        this.source = source;
        this.type = type;
        this.operate = operate;
        this.gis_list = gis_list;
    }
    
    public CLS_VO_Ignite_Message(String source,String type, String operate, CLS_VO_DomainInfoEx domainInfo) {
        this.source = source;
        this.type = type;
        this.operate = operate;
        this.domainInfo = domainInfo;
    }

    public CLS_VO_DomainInfoEx getDomainInfo() {
        return domainInfo;
    }

    public void setDomainInfo(CLS_VO_DomainInfoEx domainInfo) {
        this.domainInfo = domainInfo;
    }

    public String getSource() {
        return source;
    }

    public void setSource(String source) {
        this.source = source;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getOperate() {
        return operate;
    }

    public void setOperate(String operate) {
        this.operate = operate;
    }

    public CLS_VO_Ignite_User getUser() {
        return user;
    }

    public void setUser(CLS_VO_Ignite_User user) {
        this.user = user;
    }

    public List<CLS_VO_Ignite_Role> getRole_list() {
        return role_list;
    }

    public void setRole_list(List<CLS_VO_Ignite_Role> role_list) {
        this.role_list = role_list;
    }

    public List<CLS_VO_Ignite_Tollgate> getTollgate_list() {
        return tollgate_list;
    }

    public void setTollgate_list(List<CLS_VO_Ignite_Tollgate> tollgate_list) {
        this.tollgate_list = tollgate_list;
    }

    public List<CLS_VO_Ignite_Camera> getCamera_list() {
        return camera_list;
    }

    public void setCamera_list(List<CLS_VO_Ignite_Camera> camera_list) {
        this.camera_list = camera_list;
    }

    public List<CLS_VO_AreaParam> getArea_list() {
        return area_list;
    }

    public void setArea_list(List<CLS_VO_AreaParam> area_list) {
        this.area_list = area_list;
    }

    public List<CLS_VO_GisCoordinates> getGis_list() {
        return gis_list;
    }

    public void setGis_list(List<CLS_VO_GisCoordinates> gis_list) {
        this.gis_list = gis_list;
    }
}

2:發送RabbitMQ消息的幫助類RabbitMQClientSend

package com.tiandy.easy7.its.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import cn.jpush.api.utils.StringUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.tiandy.easy7.core.bo.CLS_BO_User;
import easy7.datatype.CLS_Easy7_Error;
import easy7.datatype.CLS_Easy7_Types;
import org.apache.log4j.Logger;

public class CLS_RabbitMQClientSend {

    private static final Logger log = Logger.getLogger(CLS_RabbitMQClientSend.class);

    //rabbitmq連接
    private static Connection connection = null;
    //rabbitmq通道
    private static Channel channel = null ;
    //連接狀態標識
    public static boolean connectStatus = false;

    public Connection getConnection() {
        return connection;
    }
    public Channel getChannel() {
        return channel;
    }
    //初始化rabbitmq連接工廠和通道
    public static void initialize(){
        try {
            //連接工廠
            ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory();
            //關閉連接與通道
            closeConnection();
            connection = factory.newConnection();
            channel = connection.createChannel();

            try {
                //聲明一個持久化的交換器,名稱為BMS_SYNC_EXCHANGE_NAME 類型為FANOUT(廣播模式)
                channel.exchangeDeclare("bms_sync_user_auth", BuiltinExchangeType.FANOUT ,true);
                connectStatus = true ;
            } catch (Exception e) {
                connectStatus = false;
                e.printStackTrace();
            }
        } catch (Exception e) {
            connectStatus = false ;
            log.error("CLS_RabbitMQClientSend method initialize error!");
        }
    }

    /**
     * 向消息中間件發送消息(從BMS同步到車輛大數據)
     * @param info 消息
     * @return
     */
    public static int sendMsg(String info) {
        //校驗
        if(StringUtils.isEmpty(info) ){
            return CLS_Easy7_Error.ERROR_PARAM;
        }
        try {
            log.debug("CLS_RabbitMQClientSend sendMsg:" + info.getBytes());
            channel.basicPublish("bms_sync_user_auth", "", null, info.getBytes());
            return CLS_Easy7_Error.ERROR_OK;
        } catch (Exception e) {
            log.error("CLS_RabbitMQClientSend method sendMsg error!");
            return CLS_Easy7_Error.ERROR_REQUEST_FAILED;
        }
    }

    //關閉連接
    public static void closeConnection(){
        try {
            if (channel != null) {
                if(channel.isOpen()) {
                    channel.close();
                    channel = null;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            if (connection != null) {
                if(connection.isOpen()) {
                    connection.close();
                    connection = null;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

3:RabbitMQ的核心幫助類RabbitMQUtil

package com.tiandy.easy7.its.rabbitmq;

import javax.annotation.Resource;

import com.tiandy.easy7.core.util.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;

import net.sf.json.JSONObject;

import com.rabbitmq.client.ConnectionFactory;
import com.tiandy.easy7.core.bo.CLS_BO_SystemInfo;
import com.tiandy.easy7.core.po.TabSystemInfo;
import com.tiandy.easy7.core.util.MyApplicationContextUtil;
import com.tiandy.easy7.core.vo.CLS_VO_Result;
import com.tiandy.easy7.face.rabbitmq.CLS_RabbitMQExchangeRecive;

import easy7.datatype.CLS_Easy7_Types;

import java.net.InetAddress;

//MQ工具類
public class CLS_RabbitMQUtil {

    private static final Logger log = LoggerFactory.getLogger(CLS_RabbitMQUtil.class);
    @Resource(name="boSystemInfo")
    private static CLS_BO_SystemInfo boSystemInfo;  
    //構造方法私有
    private CLS_RabbitMQUtil(){
        
    }
    //得到rabbitmq連接工廠
    public static final ConnectionFactory getRabbitMQConnectionFactory(){
        //平台重啟從數據庫獲取rabbitMq的IP
        getRabbitMQIP();
        //當平台第一次使用時,默認為127.0.0.1
        if(null == CLS_Easy7_Types.rabbitmq_host_for_bj || "".equals(CLS_Easy7_Types.rabbitmq_host_for_bj)){
            CLS_Easy7_Types.rabbitmq_host_for_bj = "127.0.0.1";
        }
        RabbitMQConnectionFactory.factory.setHost(CLS_Easy7_Types.rabbitmq_host_for_bj); //127.0.0.1
        return RabbitMQConnectionFactory.factory;
    }
    
    /**
     * 獲取RabbitMQ IP地址
     */
    public static void getRabbitMQIP() {
        ApplicationContext ctx = MyApplicationContextUtil.getContext();
        CLS_BO_SystemInfo boSystemInfo = ctx.getBean(CLS_BO_SystemInfo.class);
        CLS_VO_Result result = null;
        JSONObject json = null;
        JSONObject jsonContent = null;
        JSONObject jsonParam = null;
        int ret;
        String rabbitmq_ip = null;
        try {
            //LS_Easy7_Types.EASY7_ACTIVEMQ_SID ="activemq-server-config-2017728";//activemqid
            result = boSystemInfo.getSystemInfo(CLS_Easy7_Types.EASY7_ACTIVEMQ_SID);
            json = JSONObject.fromObject(result);
            ret = json.getInt("ret");
            if(ret == 0) {
                String content = json.getString("content");
                if(content != null && !"".equals(content)) {
                    jsonContent = JSONObject.fromObject(content);
                    String sParam = jsonContent.getString("sParam");
                    if(sParam != null && !"".equals(sParam)) {
                        jsonParam = JSONObject.fromObject(sParam);
                        rabbitmq_ip = jsonParam.getString("rabbitmqIp");
                    }
                }
            }
        } catch (Exception e1) {
            e1.printStackTrace();
            log.error("rabbitmq id is not exist!");
            return;
        }
        if(jsonParam != null) {
            jsonParam.clear();
            jsonParam = null;
        }
        if(jsonContent != null) {
            jsonContent.clear();
            jsonContent = null;
        }
        if(json != null) {
            json.clear();
            json = null;
        }
        
        if(result == null || rabbitmq_ip == null || "".equals(rabbitmq_ip)) {
            log.error("rabbitmq ip is not exist!");
            return;
        }
          "127.0.0.1" = rabbitmq_ip;

        //獲取本地服務器IP,作為接受下級權限申請的routinfKey
        try{
            if(null == CLS_Easy7_Types.LOCALHOST_IP || "".equals(CLS_Easy7_Types.LOCALHOST_IP)){
                CLS_Easy7_Types.LOCALHOST_IP = Tools.getLinuxLocalIp();
            }
        }catch (Exception e){
            log.error("get LinuxIp error" + e);
        }
    }
    
    
    //rabbitmq連接工廠單例
    private static class RabbitMQConnectionFactory{
        private static ConnectionFactory factory = new ConnectionFactory(); 
        static{
            try {
               //可以通過properties配置文件配置
                factory.setPort(5673);//MQ端口  
                factory.setUsername(admin);//MQ用戶名  
                factory.setPassword(123456);//MQ密碼  
                factory.setRequestedHeartbeat(10);//設置心跳 (秒)
                factory.setAutomaticRecoveryEnabled(true);//自動恢復連接
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

4:獲取需要發送MQ的消息

public CLS_VO_Ignite_Message deleteHostCamera(){
//定義一個空的list模擬數據,具體實際業務中里邊肯定是有數據的(需要發現車輛的數據消息) List
<CLS_VO_Ignite_Camera> cameralist = new ArrayList<CLS_VO_Ignite_Camera>(); //定義拼接向Rabbit發送的消息體 (json格式的數據) CLS_VO_Ignite_Message message = new CLS_VO_Ignite_Message(CLS_Easy7_Types.MESSAGE_SOURCE_BMS,CLS_Easy7_Types.MESSAGE_TYPE_CAMERA, CLS_Easy7_Types.MESSAGE_OPERATE_DEL,null,null,null,cameralist,null); return message; }

5:發送消息

public void sendRabbitMQMessage(){
  //如發送的消息:
//{"id":"f6560157-24ae-475b-9736-34072684b008","reviewInfo":"","status":1,"ids":[],"currentUserId":"admin","reviewUser":"admin"}
CLS_VO_Ignite_Message message
=deleteHostCamera(); //消息為空時不發送消息 if(null != message.getSource() && !"".equals(message.getSource())){ int sendResult = CLS_RabbitMQClientSend.sendMsg(JSONObject.fromObject(message).toString()); //同步消息失敗,錯誤代碼使用-40,數據庫操作回滾 if(sendResult != 0){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); //事務回滾 result.setRet( -40);//請求失敗 return result; } } }

6:接收MQ中發送的消息

  1 package com.tiandy.vbs.ics.rabbitMQ;
  2 import java.io.IOException;
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 import java.util.concurrent.ArrayBlockingQueue;
  6 import java.util.concurrent.Future;
  7 import java.util.concurrent.ThreadPoolExecutor;
  8 import java.util.concurrent.TimeUnit;
  9 
 10 import javax.annotation.PostConstruct;
 11 
 12 import org.springframework.beans.factory.annotation.Autowired;
 13 import org.springframework.stereotype.Component;
 14 
 15 import com.rabbitmq.client.AMQP;
 16 import com.rabbitmq.client.BuiltinExchangeType;
 17 import com.rabbitmq.client.Channel;
 18 import com.rabbitmq.client.Connection;
 19 import com.rabbitmq.client.ConnectionFactory;
 20 import com.rabbitmq.client.Consumer;
 21 import com.rabbitmq.client.DefaultConsumer;
 22 import com.rabbitmq.client.Envelope;
 23 import com.tiandy.vbs.adapter.rabbitmq.CLS_ConsumerThreadPool;
 24 import com.tiandy.vbs.adapter.rabbitmq.CLS_RabbitMQQueueRecive;
 25 import com.tiandy.vbs.adapter.rabbitmq.CLS_RabbitMQUtil;
 26 import com.tiandy.vbs.common.util.CLS_VBS_Types;
 27 import com.tiandy.vbs.common.util.InterfaceLogger;
 28 import com.tiandy.vbs.ics.bo.CLS_BO_DispositionAdapter;
 29 import com.tiandy.vbs.ics.vo.CLS_VO_MQMessage;
 30 
 31 @Component
 32 public class CLS_QueueDispositionRecive extends CLS_RabbitMQQueueRecive{
 33     //rabbitmq連接
 34     private static Connection connection = null;
 35     //rabbitmq通道
 36     private static Channel channel = null ;
 37     //連接狀態標識
 38     public boolean connectStatus = false;
 39     //消費者線程池
 40     public static CLS_ConsumerThreadPool consumerThreadPool = new CLS_ConsumerThreadPool(CLS_VBS_Types.corePoolSize,CLS_VBS_Types.maximumPoolSize,
 41             CLS_VBS_Types.keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CLS_VBS_Types.workQueue), new ThreadPoolExecutor.CallerRunsPolicy());
 42     @Autowired
 43     private CLS_BO_DispositionAdapter boAdapter;
 44     public static CLS_QueueDispositionRecive queueRecive;    
 45     @PostConstruct
 46     public void init() {
 47         queueRecive = this;
 48     }
 49     public Connection getConnection() {
 50         return connection;
 51     }
 52     public Channel getChannel() {
 53         return channel;
 54     }
 55     public void initialize(){
 56         try {
 57             //連接工廠
 58             ConnectionFactory factory= CLS_RabbitMQUtil.getRabbitMQConnectionFactory();
 59             //關閉連接與通道
 60             closeConnection();
 61             connection = factory.newConnection();  
 62             channel = connection.createChannel();  
 63             syncConsumer();
 64             connectStatus = true ;
 65         } catch (Exception e) {
 66             connectStatus = false ;
 67             e.printStackTrace();
 68             InterfaceLogger.error("CLS_RabbitMQQueueRecive method initialize:"+e.getMessage(),e);
 69         }
 70     }
//核心同步接收消息
71 public void syncConsumer()throws Exception{ 72 Map<String, Object> args = new HashMap<String, Object>(); 73 args.put("x-max-length", 100000); 74 args.put("x-message-ttl",CLS_VBS_Types.x_message_ttl); 75 76 channel.queueDeclare(CLS_VBS_Types.rabbitmq_queue_alarm_record,true,false,false,args); 77 //聲明交換器 78 channel.exchangeDeclare(CLS_VBS_Types.rabbitmq_exchange_name, BuiltinExchangeType.FANOUT ,true); 79 channel.queueBind(CLS_VBS_Types.rabbitmq_queue_alarm_record, CLS_VBS_Types.rabbitmq_exchange_name, ""); 80 //消費者 81 Consumer consumer = new DefaultConsumer(channel) { 82 @Override 83 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 84 //接收到的消息 85 String message = new String(body, "UTF-8"); 86 InterfaceLogger.debug("recive message:"+message); 87 if(!"".equals(message)){ 88 CLS_VO_MQMessage mqVo = new CLS_VO_MQMessage(); 89 mqVo.setMessage(message); 90 mqVo.setEnvelope(envelope); 91 CLS_DispositionConsumer worker = new CLS_DispositionConsumer(mqVo,queueRecive.boAdapter,channel); 92 consumerThreadPool.submit(worker);//多線程處理數據 93 }else { 94 channel.basicAck(envelope.getDeliveryTag(), false); 95 } 96 } 97 }; 98 //消息手動確認 99 channel.basicConsume(CLS_VBS_Types.rabbitmq_queue_alarm_record, false, consumer); 100 } 101 //關閉連接 102 public void closeConnection(){ 103 try { 104 if (channel != null) { 105 if(channel.isOpen()) { 106 channel.close(); 107 channel = null; 108 } 109 } 110 } catch (Exception e) { 111 InterfaceLogger.error("CLS_RabbitMQExchangeRecive closeChannel error " + e); 112 e.printStackTrace(); 113 } 114 try { 115 if (connection != null) { 116 if(connection.isOpen()) { 117 connection.close(); 118 connection = null; 119 } 120 } 121 } catch (Exception e) { 122 InterfaceLogger.error("CLS_RabbitMQExchangeRecive closeConnection error " + e); 123 e.printStackTrace(); 124 } 125 } 126 }

7:線程池處理消息類

 1 package com.tiandy.vbs.adapter.rabbitmq;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ThreadPoolExecutor;
 5 import java.util.concurrent.TimeUnit;
 6 import org.springframework.stereotype.Component;
 7 import com.tiandy.vbs.common.util.CLS_VBS_Types;
 8 @Component
 9 public abstract class CLS_RabbitMQQueueRecive {
10     
11     //消費者線程池
12     public static CLS_ConsumerThreadPool consumerThreadPool = new CLS_ConsumerThreadPool(CLS_VBS_Types.corePoolSize,CLS_VBS_Types.maximumPoolSize,
13             CLS_VBS_Types.keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CLS_VBS_Types.workQueue), new ThreadPoolExecutor.CallerRunsPolicy());    
14     //初始化連接
15     public abstract void initialize();
16     //消費者
17     public abstract void syncConsumer()throws Exception;
18     //關閉連接
19     public abstract void closeConnection();
20 }

其他應用或平台接收到MQ中的消息后,解析JSON數據,轉換為對於的數據實體,將數據添加到相應的庫中表中即可,這樣就完成了消息的發送和數據的同步!

至此,發現消息得過程就完成了!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM