package com.qx.pos.util; import com.tzx.base.common.util.PosPropertyUtil; import com.tzx.framework.common.util.SpringConext; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import java.util.Date; /** * mqtt 工具类 * @author hebing */ public class MqttUtil { private static MqttPahoMessageHandler mqttHandler; public static String mqttSwitch; static { mqttSwitch = PosPropertyUtil.getMsg("mqtt.switch"); if (mqttSwitch != null) { mqttSwitch = mqttSwitch.trim(); } } /** * Mqtt 消息发送 * 默认 qos 服务质量 2 只有一次的传输 ; 生存时间 48小时 * * @param topic 消息主题 * @param content 消息内容 */ public static void send(String topic, String content) { send(topic, content, 2, 48 * 60 * 60 * 1000L); } /** * Mqtt 消息发送 * @param topic * @param content * @param qos * @param ttl */ public static void send(String topic , String content, int qos, long ttl) { if (mqttSwitch != null && "off".equalsIgnoreCase(mqttSwitch)) { System.out.printf("mqtt send message switch off !!!"); return ; } synchronized(MqttUtil.class){ if (mqttHandler == null) { mqttHandler = (MqttPahoMessageHandler) SpringConext.getApplicationContext().getBean("mqttHandler"); } } System.out.printf("mqtt send msg -- topic:%s content:%s \r\n",topic,content); long nowMillis = System.currentTimeMillis(); // 构建消息 Message<String> messages = MessageBuilder.withPayload(content).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, qos).setExpirationDate(new Date(nowMillis + ttl)).build(); // 发送消息 mqttHandler.handleMessage(messages); } }
spring 配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt" xsi:schemaLocation=" http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> <!-- <property name="userName" value="${mqtt.username}"/> <property name="password" value="${mqtt.password}"/> --> <property name="cleanSession" value="false"/> <property name="keepAliveInterval" value="30"/> <property name="serverURIs"> <array> <value>${mqtt.url}</value> </array> </property> </bean> <bean id="mqttHandler" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler"> <constructor-arg name="clientId" value="pos-server-sender"/> <constructor-arg name="clientFactory" ref="clientFactory"/> <property name="async" value="true"/> <property name="defaultQos" value="2"/> <property name="defaultRetained" value="false"/> <property name="completionTimeout" value="15000"/> </bean> <!-- 消息适配器 --> <int-mqtt:message-driven-channel-adapter id="mqttInbound" client-id="robot-devices-receiver" url="${mqtt.url}" topics="upload_devices_status" client-factory="clientFactory" send-timeout="20000" channel="receiveMsg" recovery-interval="12000" /> <int:channel id="receiveMsg" /> <!-- 消息处理类 --> <int:service-activator id="startCaseService" input-channel="receiveMsg" ref="mqttCaseService" method="receiveMsg" /> <!--这里是指向消息监听到后对消息处理的类--> <bean id="mqttCaseService" class="com.qx.pos.bo.impl.MqttCaseServiceImpl" /> </beans>
maven pom.xml 引入依赖包:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>${spring.version}</version> </dependency> <!-- mqtt client --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
消息处理类:
package com.qx.pos.bo.impl; import com.qx.pos.bo.DeviceStatusService; import com.qx.pos.bo.MqttCaseService; import com.tzx.framework.common.util.SpringConext; import net.sf.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; /** * Mqtt 消息接收服务 * @author hebing */ public class MqttCaseServiceImpl implements MqttCaseService { private final Logger logger = LoggerFactory.getLogger(MqttCaseServiceImpl.class); /** * 上报设备状态TOPIC */ static final String MSG_RECEIVE_TOPIC = "upload_devices_status"; @Override public void receiveMsg(Message message) { try { if (message.getHeaders() != null && message.getHeaders().get(MqttHeaders.TOPIC) != null) { String topic = message.getHeaders().get(MqttHeaders.TOPIC).toString(); String msg = ""; if (message.getPayload() != null) { msg = message.getPayload().toString(); } String qos = ""; if (message.getHeaders().get(MqttHeaders.QOS) != null) { qos = message.getHeaders().get(MqttHeaders.QOS).toString(); } logger.info("接收到的mqtt消息topic:-> "+ topic + " qos: -> " + qos + " message:-> " + msg); // 订阅的消息topic 上报设备状态 if (MSG_RECEIVE_TOPIC.equals(topic)) { DeviceStatusService deviceStatusService = (DeviceStatusService) SpringConext.getBean(DeviceStatusService.NAME); if (deviceStatusService == null) { logger.error("DeviceStatusService Bean 实例化错误!!!"); } else { JSONObject param = JSONObject.fromObject(msg); String tenancyId = param.optString("tenancy_id"); int storeId = param.optInt("store_id"); deviceStatusService.addDeviceStatus(tenancyId, storeId, param); } } } else { logger.debug("未收到正确的mqtt消息"); } } catch (Throwable t) { logger.error("mqtt client 消费者接收消息失败: -> " + t.getMessage()); } } }