MQTT 通讯接口DEMO


package com.qx.pos.util;

import com.tzx.base.common.util.PosPropertyUtil;
import com.tzx.framework.common.util.SpringConext;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import java.util.Date;

/**
 *  mqtt 工具类
 *  @author hebing
 */
public class MqttUtil {
    private static MqttPahoMessageHandler mqttHandler;

    public static String mqttSwitch;
    static {
        mqttSwitch = PosPropertyUtil.getMsg("mqtt.switch");
        if (mqttSwitch != null) {
            mqttSwitch = mqttSwitch.trim();
        }

    }

    /**
     * Mqtt 消息发送
     * 默认 qos 服务质量 2 只有一次的传输 ; 生存时间 48小时
     *
     * @param topic   消息主题
     * @param content 消息内容
     */
    public static void send(String topic, String content) {
        send(topic, content, 2, 48 * 60 * 60 * 1000L);
    }

    /**
     * Mqtt 消息发送
     * @param topic
     * @param content
     * @param qos
     * @param ttl
     */
    public static void send(String topic , String content, int qos, long ttl) {
        if (mqttSwitch != null && "off".equalsIgnoreCase(mqttSwitch)) {
            System.out.printf("mqtt send message switch off !!!");
            return ;
        }

        synchronized(MqttUtil.class){
            if (mqttHandler == null) {
                mqttHandler = (MqttPahoMessageHandler) SpringConext.getApplicationContext().getBean("mqttHandler");
            }
        }

        System.out.printf("mqtt send msg -- topic:%s   content:%s   \r\n",topic,content);
        long nowMillis = System.currentTimeMillis();

        // 构建消息
        Message<String> messages = MessageBuilder.withPayload(content).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, qos).setExpirationDate(new Date(nowMillis + ttl)).build();
        // 发送消息
        mqttHandler.handleMessage(messages);
    }
}

  

spring 配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/integration/mqtt
        http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.3.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd">

    <bean id="clientFactory"
          class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <!--
        <property name="userName" value="${mqtt.username}"/>
        <property name="password" value="${mqtt.password}"/>
        -->
        <property name="cleanSession" value="false"/>
        <property name="keepAliveInterval" value="30"/>
        <property name="serverURIs">
            <array>
                <value>${mqtt.url}</value>
            </array>
        </property>
    </bean>

    <bean id="mqttHandler" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler">
        <constructor-arg name="clientId" value="pos-server-sender"/>
        <constructor-arg name="clientFactory" ref="clientFactory"/>
        <property name="async" value="true"/>
        <property name="defaultQos" value="2"/>
        <property name="defaultRetained" value="false"/>
        <property name="completionTimeout" value="15000"/>
    </bean>

    <!-- 消息适配器  -->
    <int-mqtt:message-driven-channel-adapter
            id="mqttInbound"
            client-id="robot-devices-receiver"
            url="${mqtt.url}"
            topics="upload_devices_status"
            client-factory="clientFactory"
            send-timeout="20000"
            channel="receiveMsg"
            recovery-interval="12000" />

    <int:channel id="receiveMsg" />

    <!-- 消息处理类 -->
    <int:service-activator id="startCaseService"
                           input-channel="receiveMsg" ref="mqttCaseService" method="receiveMsg" />
    <!--这里是指向消息监听到后对消息处理的类-->
    <bean id="mqttCaseService" class="com.qx.pos.bo.impl.MqttCaseServiceImpl" />
</beans>

 

maven pom.xml 引入依赖包:

<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!-- mqtt client  -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

 

消息处理类:

package com.qx.pos.bo.impl;

import com.qx.pos.bo.DeviceStatusService;
import com.qx.pos.bo.MqttCaseService;
import com.tzx.framework.common.util.SpringConext;
import net.sf.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;

/**
 * Mqtt 消息接收服务
 * @author hebing
 */
public class MqttCaseServiceImpl implements MqttCaseService {
    private final Logger logger = LoggerFactory.getLogger(MqttCaseServiceImpl.class);
    /**
     * 上报设备状态TOPIC
     */
    static final String MSG_RECEIVE_TOPIC = "upload_devices_status";

    @Override
    public void receiveMsg(Message message) {
        try {
            if (message.getHeaders() != null && message.getHeaders().get(MqttHeaders.TOPIC) != null) {
                String topic = message.getHeaders().get(MqttHeaders.TOPIC).toString();
                String msg = "";
                if (message.getPayload() != null) {
                    msg = message.getPayload().toString();
                }
                String qos = "";
                if (message.getHeaders().get(MqttHeaders.QOS) != null) {
                    qos = message.getHeaders().get(MqttHeaders.QOS).toString();
                }

                logger.info("接收到的mqtt消息topic:-> "+ topic + " qos: -> " + qos + " message:-> " + msg);
                // 订阅的消息topic 上报设备状态
                if (MSG_RECEIVE_TOPIC.equals(topic)) {
                    DeviceStatusService deviceStatusService = (DeviceStatusService) SpringConext.getBean(DeviceStatusService.NAME);
                    if (deviceStatusService == null) {
                        logger.error("DeviceStatusService Bean 实例化错误!!!");
                    } else {
                        JSONObject param = JSONObject.fromObject(msg);
                        String tenancyId = param.optString("tenancy_id");
                        int storeId = param.optInt("store_id");
                        deviceStatusService.addDeviceStatus(tenancyId, storeId, param);
                    }
                }

            } else {
                logger.debug("未收到正确的mqtt消息");
            }
        } catch (Throwable t) {
            logger.error("mqtt client 消费者接收消息失败: -> " + t.getMessage());
        }

    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM