1.我采用的是springboot,首先pom文件中添加mqtt需要用到的依賴
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-integration</artifactId> 4 </dependency> 5 <dependency> 6 <groupId>org.springframework.integration</groupId> 7 <artifactId>spring-integration-stream</artifactId> 8 </dependency> 9 <dependency> 10 <groupId>org.springframework.integration</groupId> 11 <artifactId>spring-integration-mqtt</artifactId> 12 </dependency> 13 14 15 <dependency> 16 <groupId>org.eclipse.paho</groupId> 17 <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 18 <version>1.2.0</version> 19 </dependency>
2.書寫客戶端,客戶端中包括了連接服務器,訂閱主題,向主題發布消息的基本操作
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Description: * mqtt客戶端 * @author Victor */ @Component public class MqttPushClient { @Autowired private PushCallback pushCallback; private static final Logger LOGGER = LoggerFactory.getLogger(MqttPushClient.class); private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MqttPushClient.client = client; } /** * 連接 * @param host . * @param clientID . * @param username . * @param password . */ public void connect(String host, String clientID, String username, String password) { MqttClient client; try { client = new MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(30); options.setKeepAliveInterval(20); MqttPushClient.setClient(client); try { client.setCallback(pushCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 發布,默認qos為0,非持久化 * * @param topic . * @param pushMessage . */ public void publish(String topic, String pushMessage) { publish(0, false, topic, pushMessage); } /** * 發布主題和消息隊列 * * @param qos . * @param retained . * @param topic . * @param pushMessage . */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { LOGGER.error("topic not exist"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題,qos默認為0 * * @param topic . */ public void subscribe(String topic) { subscribe(topic, 0); } /** * 訂閱某個主題 * * @param topic . * @param qos . */ public void subscribe(String topic, int qos) { try { MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
還有對應回調函數,實現MqttCallback 接口之后要實現三個方法,如果出現需要斷開重連操作就在connectionLost方法中操作,如果需要從我們訂閱主題拿到數據保存到數據庫,那么就在messageArrived方法里操作。SystemConstants是我寫的一個
配置連接信息的類,大家改成自己的信息就好了。
import com.alibaba.fastjson.JSONObject;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.util.StringUtils; import java.math.BigDecimal; import java.util.Date; import java.util.UUID; /** * mqtt回調類 * * @author Victor */ @Configuration public class PushCallback implements MqttCallback { @Autowired private MqttPushClient mqttPushClient; @Autowired private DeviceDao deviceDao; @Override public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 System.out.println("連接斷開,可以做重連"); mqttPushClient.connect(SystemConstants.HOST_URL + ":" + SystemConstants.PORT, "consumer" + SystemConstants.CLIENT_ID, SystemConstants.USERNAME, SystemConstants.PASSWORD); mqttPushClient.subscribe("aaa"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage message) { // subscribe后得到的消息會執行到這里面 System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內容 : " + new String(message.getPayload())); } }
如果想要把mqtt配到application中啟動只需要在application啟動類中添加PostConstruct注解即可
/** * 接受訂閱的接口和消息,mqtt消費端 */ @PostConstruct public void consumeMqttClient() { mqttPushClient.connect(SystemConstants.HOST_URL + ":" + SystemConstants.PORT, "consumer" + SystemConstants.CLIENT_ID, SystemConstants.USERNAME, SystemConstants.PASSWORD); mqttPushClient.subscribe("aaa"); }
到此為止,mqtt集成spriongboot就完成了。
新手寫博,如有不對請多指教