spring boot + mqtt 物聯網開發


最近這一年里,在項目實戰的時候,遇到了mqtt開發,今天我就大致的來總結下,mqtt在spring boot的使用

1、引用jar

 <!-- mqtt -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-configuration-processor</artifactId>
	<optional>true</optional>
</dependency>

2.項目啟動建立鏈接

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.slife.cws.mqtt.component.MqttPushClient;
import com.slife.cws.mqtt.config.MqttConfig;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class MqttApplicationRunner implements ApplicationRunner {

	@Autowired
	private MqttConfig mqttConfig;

	@Override
	public void run(ApplicationArguments args) throws Exception {
		if (log.isInfoEnabled()) {
			log.info("===============>>>Mqtt is run starting:<<==================");
		}
		MqttPushClient mqttPushClient = new MqttPushClient();
		mqttPushClient.connect(mqttConfig);
		// 訂閱主題
		mqttPushClient.subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
	}

}

3.相關配置及實現類

①、配置

#spring.mqtt.url=tcp://127.0.0.1
spring.mqtt.url=tcp://127.0.0.1
spring.mqtt.username= nbew
spring.mqtt.password= 123456
spring.mqtt.client-id= 100201101
spring.mqtt.topics= top
spring.mqtt.completion-timeout= 3000
spring.mqtt.timeout= 120
spring.mqtt.keep-alive= 20
spring.mqtt.qos= 1,1

spring:
  mqtt:
    url: tcp://mqtt.rootcloudapp.com:1883
    username: 0ba851e2e83609b9
    password: 81757448a4df0d73
    client-id: 0ba851e2e83609b9
    id: test10000011
    topics: v4/p/post/thing/live/json/1.1
    completion-timeout: 3000
    timeout: 30
    keep-alive: 60
    qos: 1

  

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import lombok.Data;

//@ConfigurationProperties(prefix = "spring.mqtt")
@Data
@Component
@Configuration
public class MqttConfig {

	/**
	 * 鏈接url
	 */
	@Value("${spring.mqtt.url}")
	private String url;

	/**
	 * 用戶名
	 */
	@Value("${spring.mqtt.username}")
	private String username;

	/**
	 * 密碼
	 */
	@Value("${spring.mqtt.password}")
	private String password;

	/**
	 * 客戶端id
	 */
	@Value("${spring.mqtt.client-id}")
	private String clientId;

	/**
	 * 通訊標識 id
	 */
	@Value("${spring.mqtt.id}")
	private String id;

	/**
	 * 主題
	 */
	@Value("${spring.mqtt.topics}")
	private String[] topic;

	/**
	 * 超時時間
	 */
	@Value("${spring.mqtt.timeout}")
	private int timeout;

	/**
	 * 心跳檢測時間
	 */
	@Value("${spring.mqtt.keep-alive}")
	private int keepAlive;

	/**
	 * 心跳包級別
	 */
	@Value("${spring.mqtt.qos}")
	private int[] qos;

	private int completionTimeout;

}

  

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
 * @Package com.slife.cws.mqtt.component
 * @ClassName: Mqttbean
 * @Description: 客戶端
 * @Author youli
 * @date 2021年2月16日
 * @CopyRight:上海成生科技有限公司
 */
@Component
public class Mqttbean {

	@Bean("mqttPushClient")
	public MqttPushClient getMqttPushClient() {
		MqttPushClient mqttPushClient = new MqttPushClient();
		return mqttPushClient;
	}

}

  

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.slife.cws.mqtt.config.MqttConfig;

import lombok.extern.slf4j.Slf4j;

/**
 * @Package com.shhw.mqtt.component
 * @ClassName: MqttPushClient
 * @Description: MqttClient客戶端代碼
 * @Author youli
 * @date 2020年10月16日
 * @CopyRight:上海成生科技有限公司
 */
@Slf4j
public class MqttPushClient {

	private static MqttClient client;

	public static MqttClient getClient() {
		return client;
	}

	public static void setClient(MqttClient client) {
		MqttPushClient.client = client;
	}

	private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
		// MQTT連接設置
		MqttConnectOptions option = new MqttConnectOptions();
		// 設置是否清空session,false表示服務器會保留客戶端的連接記錄,true表示每次連接到服務器都以新的身份連接
		option.setCleanSession(false);
		// 設置連接的用戶名
		option.setUserName(userName);
		// 設置連接的密碼
		option.setPassword(password.toCharArray());
		// 設置超時時間 單位為秒
		option.setConnectionTimeout(outTime);
		// 設置會話心跳時間 單位為秒 服務器會每隔(1.5*keepTime)秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
		option.setKeepAliveInterval(KeepAlive);
		// setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
		// option.setWill(topic, "close".getBytes(), 2, true);
		option.setMaxInflight(1000);
		log.info("================>>>MQTT連接認證成功<<======================");
		return option;
	}

	/**
	 * 連接
	 */
	public void connect(MqttConfig mqttConfig) {
		MqttClient client;
		try {
			String clientId = mqttConfig.getClientId();
			clientId += System.currentTimeMillis();
			client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence());
			MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
					mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
			MqttPushClient.setClient(client);
			try {
				client.setCallback(new PushCallback<Object>(this, mqttConfig));
				if (!client.isConnected()) {
					client.connect(options);
					log.info("================>>>MQTT連接成功<<======================");
					 //訂閱主題
					subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
				} else {// 這里的邏輯是如果連接不成功就重新連接
					client.disconnect();
					client.connect(options);
					log.info("===================>>>MQTT斷連成功<<<======================");
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 斷線重連
	 *
	 * @throws Exception
	 */
	public Boolean reConnect() throws Exception {
		Boolean isConnected = false;
		if (null != client) {
			client.connect();
			if (client.isConnected()) {
				isConnected = true;
			}
		}
		return isConnected;
	}

	/**
	 * 發布,默認qos為0,非持久化
	 *
	 * @param topic
	 * @param pushMessage
	 */
	public void publish(String topic, String pushMessage) {
		publish(0, false, topic, pushMessage);
	}

	/**
	 * 發布
	 *
	 * @param qos
	 * @param retained
	 * @param topic
	 * @param pushMessage
	 */
	public void publish(int qos, boolean retained, String topic, String pushMessage) {
		MqttMessage message = new MqttMessage();
		message.setQos(qos);
		message.setRetained(retained);
		message.setPayload(pushMessage.getBytes());
		MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
		if (null == mTopic) {
			log.error("===============>>>MQTT topic 不存在<<=======================");
		}
		MqttDeliveryToken token;
		try {
			token = mTopic.publish(message);
			token.waitForCompletion();
		} catch (MqttPersistenceException e) {
			e.printStackTrace();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 發布消息的服務質量(推薦為:2-確保消息到達一次。0-至多一次到達;1-至少一次到達,可能重復), retained
	 * 默認:false-非持久化(是指一條消息消費完,就會被刪除;持久化,消費完,還會保存在服務器中,當新的訂閱者出現,繼續給新訂閱者消費)
	 *
	 * @param topic
	 * @param pushMessage
	 */
	public void publish(int qos, String topic, String pushMessage) {
		publish(qos, false, topic, pushMessage);
	}

	/**
	 * 訂閱某個主題,qos默認為0
	 * 
	 * @param topic
	 */
	public void subscribe(String[] topic) {
		subscribe(topic, null);
	}

	/**
	 * 訂閱某個主題
	 *
	 * @param topic
	 * @param qos
	 */
	public void subscribe(String[] topic, int[] qos) {
		try {
			MqttPushClient.getClient().unsubscribe(topic);
			MqttPushClient.getClient().subscribe(topic, qos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
}

  

import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @Package com.shhw.mqtt.component
 * @ClassName: MqttSender
 * @Description: 主題發布
 * @Author youli
 * @date 2020年10月16日
 * @CopyRight:上海成生科技有限公司
 */
@Component(value = "mqttSender")
@Slf4j
public class MqttSender {

	@Async
	public void send(String queueName, String msg) {
		log.debug("=====================>>>>發送主題:{},  msg:{}", queueName,msg);
		publish(2, queueName, msg);
	}

	/**
	 * 發布,默認qos為0,非持久化
	 * 
	 * @param topic
	 * @param pushMessage
	 */
	public void publish(String topic, String pushMessage) {
		publish(1, false, topic, pushMessage);
	}

	/**
	 * 發布
	 * 
	 * @param qos
	 * @param retained
	 * @param topic
	 * @param pushMessage
	 */
	public void publish(int qos, boolean retained, String topic, String pushMessage) {
		MqttMessage message = new MqttMessage();
		message.setQos(qos);
		message.setRetained(retained);
		message.setPayload(pushMessage.getBytes());
		MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
		if (null == mTopic) {
			log.error("===================>>>MQTT topic 不存在<<=================");
		}
		MqttDeliveryToken token;
		try {
			token = mTopic.publish(message);
			token.waitForCompletion();
		} catch (MqttPersistenceException e) {
			log.error("============>>>publish fail", e);
			e.printStackTrace();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 發布消息的服務質量(推薦為:2-確保消息到達一次。0-至多一次到達;1-至少一次到達,可能重復), retained
	 * 默認:false-非持久化(是指一條消息消費完,就會被刪除;持久化,消費完,還會保存在服務器中,當新的訂閱者出現,繼續給新訂閱者消費)
	 * 
	 * @param topic
	 * @param pushMessage
	 */
	public void publish(int qos, String topic, String pushMessage) {
		publish(qos, false, topic, pushMessage);
	}

}

  

import javax.annotation.Resource;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;

import com.slife.cws.mqtt.config.MqttConfig;
import com.slife.cws.mqtt.pojo.MqttResponseBody;
import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat;
import com.slife.cws.mqtt.service.MqttService;
import com.slife.cws.mqtt.service.impl.MqttServiceImpl;
import com.slife.cws.utils.JSONUtils;
import com.slife.cws.utils.SpringUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * @Package com.shhw.mqtt.component
 * @ClassName: PushCallback
 * @Description: 進行雙向通信的時候,監聽訂閱的客戶端和主題是否處於連接狀態
 * @Author youli
 * @date 2020年10月16日
 * @CopyRight:上海成生科技有限公司
 */
@Slf4j
@Component
public class PushCallback<component> implements MqttCallback {

	private MqttPushClient client;

	private MqttConfig mqttConfiguration;

	@Resource
	MqttService mqttService;

	public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) {
		this.client = client;
		this.mqttConfiguration = mqttConfiguration;
	}

	@Override
	public void connectionLost(Throwable cause) {
		/** 連接丟失后,一般在這里面進行重連 **/
		if (client != null) {
			while (true) {
				try {
					log.info("==============》》》[MQTT] 連接丟失,嘗試重連...");
					MqttPushClient mqttPushClient = new MqttPushClient();
					mqttPushClient.connect(mqttConfiguration);
					if (MqttPushClient.getClient().isConnected()) {
						log.info("=============>>重連成功");
					}
					break;
				} catch (Exception e) {
					log.error("=============>>>[MQTT] 連接斷開,重連失敗!<<=============");
					continue;
				}
			}
		}
		log.info(cause.getMessage());
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		// publish后會執行到這里
		log.info("pushComplete==============>>>" + token.isComplete());
	}

	/**
	 * 監聽對應的主題消息
	 * 
	 * @param topic
	 * @param message
	 * @throws Exception
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		// subscribe后得到的消息會執行到這里面
		log.info("============》》接收消息主題 : " + topic);
		log.info("============》》接收消息Qos : " + message.getQos());
		log.info("============》》接收消息內容原始內容 : " + new String(message.getPayload()));
		log.info("============》》接收消息內容GB2312 : " + new String(message.getPayload(), "GB2312"));
		log.info("============》》接收消息內容UTF-8 : " + new String(message.getPayload(), "UTF-8"));
		try {
			if (topic.equals("datapoint")) {
				MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"),
						MqttResponseBody.class);
				MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
				mqttService.messageArrived(mqttResponseBody);
			} else if (topic.equals("heartbeat")) {
				MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils
						.jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class);
				MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
				mqttService.messageHeartbeat(mqttResponseHeartbeat);
			}
		} catch (Exception e) {
			e.printStackTrace();
			log.info("============》》接收消息主題異常 : " + e.getMessage());
		}
	}

}

  

import com.slife.cws.mqtt.pojo.MqttResponseBody;
import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat;

public interface MqttService {

	/**
	 * @Title: sendMessage
	 * @Description: 發送消息
	 * @Author youli
	 * @date 2020年11月9日
	 * @param gpsMsg
	 */
	void sendMessage();

	/**
	 * @Title: messageArrived
	 * @Description: 監聽發送消息
	 * @Author youli
	 * @date 2021年2月16日
	 * @param mqttResponseBody
	 */
	void messageArrived(MqttResponseBody mqttResponseBody);

	/**
	 * @Title: messageHeartbeat
	 * @Description: 設備心跳監聽
	 * @Author youli
	 * @date 2021年7月4日
	 * @param mqttResponseHeartbeat
	 */
	void messageHeartbeat(MqttResponseHeartbeat mqttResponseHeartbeat);
}

  

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.slife.cws.mqtt.component.MqttSender;
import com.slife.cws.mqtt.config.MqttConfig;
import com.slife.cws.mqtt.pojo.MqttResponseBody;
import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat;
import com.slife.cws.mqtt.service.MqttService;
import com.slife.cws.utils.JSONUtils;
import com.slife.ews.service.DatumrealtimeService;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class MqttServiceImpl implements MqttService {

	@Autowired
	MqttConfig mqttConfig;

	@Autowired
	private MqttSender mqttSender;

	@Autowired
	DataService dataService;

	@Override
	public void sendMessage() {
		String jsonStr = null;
		mqttSender.send(mqttConfig.getTopic()[0], jsonStr);

	}

	@Override
	public void messageArrived(MqttResponseBody mqttResponseBody) {
		log.info("接口的消息:{}", JSONUtils.beanToJson(mqttResponseBody));
		dataService.save(mqttResponseBody);
	}

	@Override
	public void messageHeartbeat(MqttResponseHeartbeat mqttResponseHeartbeat) {
		log.info("監聽心跳信息:{}", JSONUtils.beanToJson(mqttResponseHeartbeat));
		dataService.heartbeat(mqttResponseHeartbeat);
	}

}

  

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.slife.cws.mqtt.component.MqttPushClient;
import com.slife.cws.mqtt.component.MqttSender;
import com.slife.cws.mqtt.config.MqttConfig;

@RestController
public class MessageController {

	@Autowired
	MqttConfig mqttConfig;

	@Autowired
	private MqttSender mqttSender;

	@Autowired
	private MqttPushClient mqttPushClient;

	/***
	 * 發布消息,用於其他客戶端消息接收測試
	 */
	@RequestMapping("/sendMqttMessage")
	public String sendMqttMessage(String topic) {
		String jsonStr = "{\"truckPic\":\"20201029111.jpg\",\"httpRootPic\":\"url\"}";
		mqttSender.send(topic, jsonStr);
		return "ok";
	}

	@RequestMapping("/mqttop")
	public String mqttop() {
		String TOPIC1 = "test_topic1";
		String TOPIC2 = "test_topic2";
		String TOPIC3 = "test_topic3";
		String TOPIC4 = "test_topic4";

		int Qos1 = 1;
		int Qos2 = 1;
		int Qos3 = 1;
		int Qos4 = 1;

		String[] topics = { TOPIC1, TOPIC2, TOPIC3, TOPIC4 };
		int[] qos = { Qos1, Qos2, Qos3, Qos4 };
		mqttPushClient.subscribe(topics, qos);
		return "訂閱主題";
	}

	public static void main(String[] args) {
		long time = System.currentTimeMillis();
		System.out.println(time);
	}

}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM