开始研究JAVA及MQTT,写下该文章方便使用参考
1、添加依赖 pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.35</version>
</dependency>
2、mqtt配置 application.yml
# Spring配置
spring:
# MQTT
mqtt:
# 服务器连接地址,如果有多个,用逗号隔开
host: tcp://192.168.1.1:1883
# 连接服务器默认客户端ID
clientId: mqtt_jlsId_
# 默认的消息推送主题,实际可在调用接口时指定
topic: topic_a,topic_b,topic_c
# 用户名
username:
# 密码
password:
# 连接超时
timeout: 30
# 心跳
keepalive: 30
3、获取配置信息工具类
当然可以使用注解 @Value获取配置信息
这里是网上查的资料直接使用以下类来获取
package com.projuct.junlaishun.mqtt;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* 获取配置信息
**/
public class PropertiesUtil {
public static String MQTT_HOST;
public static String MQTT_CLIENT_ID;
public static String MQTT_USER_NAME;
public static String MQTT_PASSWORD;
public static String MQTT_TOPIC;
public static Integer MQTT_TIMEOUT;
public static Integer MQTT_KEEP_ALIVE;
/**
* mqtt配置
*/
static {
Properties properties = loadMqttProperties();
MQTT_HOST = properties.getProperty("host");
MQTT_CLIENT_ID = properties.getProperty("clientId")+Math.random();
MQTT_USER_NAME = properties.getProperty("username");
MQTT_PASSWORD = properties.getProperty("password");
MQTT_TOPIC = properties.getProperty("topic");
MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));
MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));
}
private static Properties loadMqttProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
4、MQTT客户端 MqttConsumer类
package com.projuct.junlaishun.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class MqttConsumer implements ApplicationRunner { private static MqttClient client; @Override public void run(ApplicationArguments args) { System.out.println("初始化并启动mqtt......"); this.connect(); } /** * 连接mqtt服务器 */ private void connect() { try { // 1 创建客户端 getClient(); // 2 设置配置 MqttConnectOptions options = getOptions(); String[] topic = PropertiesUtil.MQTT_TOPIC.split(","); // 3 消息发布质量 int[] qos = getQos(topic.length); // 4 最后设置 create(options, topic, qos); } catch (Exception e) { System.out.println("mqtt连接异常:" + e); } } /** * 创建客户端 --- 1 --- */ public void getClient() { try { if (null == client) { client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence()); } System.out.println("创建mqtt客户端:" ); } catch (Exception e) { System.out.println("创建mqtt客户端异常:\" + e:" ); } } /** * 生成配置对象,用户名,密码等 --- 2 --- */ public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(PropertiesUtil.MQTT_USER_NAME); options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); // 设置超时时间 options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); // 设置会话心跳时间 options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); // 是否清除session options.setCleanSession(false); System.out.println("--生成mqtt配置对象"); return options; } /** * qos --- 3 --- */ public int[] getQos(int length) { int[] qos = new int[length]; for (int i = 0; i < length; i++) { /** * MQTT协议中有三种消息发布服务质量: * * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。 * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大 */ qos[i] = 1; } System.out.println("--设置消息发布质量"); return qos; } /** * 装载各种实例和订阅主题 --- 4 --- */ public void create(MqttConnectOptions options, String[] topic, int[] qos) { try { client.setCallback(new MqttConsumerCallback(client, options, topic, qos)); System.out.println("--添加回调处理类"); client.connect(options); } catch (Exception e) { System.out.println("装载实例或订阅主题异常:" + e); } } /** * 订阅某个主题 * * @param topic * @param qos */ public static void subscribe(String topic, int qos) { try { System.out.println("topic:" + topic); client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布,非持久化 * * qos根据文档设置为1 * * @param topic * @param msg */ public static void publish(String topic, String msg) { publish(1, false, topic, msg); } /** * 发布 */ public static void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { System.out.println("topic:" + topic + " 不存在"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); if (!token.isComplete()) { System.out.println("消息发送成功"); } } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } }
5、mqtt回调处理类
package com.projuct.junlaishun.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import java.util.Arrays;
/**
* mqtt回调处理类
*/
public class MqttConsumerCallback implements MqttCallbackExtended {
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
/**
* 断开重连
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("MQTT连接断开,发起重连......");
try {
if (null != client && !client.isConnected()) {
client.reconnect();
System.out.println("尝试重新连接");
} else {
client.connect(options);
System.out.println("尝试建立新连接");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 接收到消息调用令牌中调用
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + Arrays.toString(topic));
}
/**
* 消息处理
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String msg = new String(message.getPayload());
System.out.println("收到topic:" + topic + " 消息:" + msg);
System.out.println("收到消息后执行具体的业务逻辑操作,比如将消息存储进数据库");
} catch (Exception e) {
System.out.println("处理mqtt消息异常:" + e);
}
}
/**
* mqtt连接后订阅主题
*/
@Override
public void connectComplete(boolean b, String s) {
try {
if (null != topic && null != qos) {
if (client.isConnected()) {
client.subscribe(topic, qos);
System.out.println("mqtt连接成功,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
System.out.println("--订阅主题::" + Arrays.toString(topic));
} else {
System.out.println("mqtt连接失败,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
}
}
} catch (Exception e) {
System.out.println("mqtt订阅主题异常:" + e);
}
}
}
6、编写测试方法MqttController
package com.projuct.junlaishun.mqtt;
import com.alibaba.fastjson.JSONObject;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/mqtt")
public class MqttController {
/**
* 推送消息
*/
@ResponseBody
//@GetMapping(value = "/push")
@RequestMapping("sendMqttMessage")
public JSONObject push(String topic,
String message) {
MqttConsumer.publish(topic, message);
JSONObject res = new JSONObject();
res.put("code",0);
res.put("msg","测试发布主题成功");
return res;
}
}
测试记录截图

