依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.3</version> </dependency>
配置
spring:
mqtt:
clientId: test1
url: tcp://192.168.1.24:1883
username: admin
password: 123456
配置類
MyMqttClient.java
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Component public class MyMqttClient { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; @Autowired private MqttRecieveCallback mqttRecieveCallback; @Autowired private MqttTwoRecieveCallback mqttTwoRecieveCallback; @Value("${spring.mqtt.url}") private String serverURI; @Value("${spring.mqtt.clientId}") private String clientId; @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @PostConstruct public void init() { //初始化連接設置對象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if (null != mqttConnectOptions) { // true可以安全地使用內存持久性作為客戶端斷開連接時清除的所有狀態 mqttConnectOptions.setCleanSession(true); // 設置連接超時 mqttConnectOptions.setConnectionTimeout(10); //設置賬號密碼 // mqttConnectOptions.setUserName(username); // mqttConnectOptions.setPassword(password.toCharArray()); // 設置持久化方式 memoryPersistence = new MemoryPersistence(); if (null != memoryPersistence && null != clientId) { try { mqttClient = new MqttClient(serverURI, clientId, memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { } } else { System.out.println("mqttConnectOptions對象為空"); } System.out.println(mqttClient.isConnected()); //設置連接和回調 if (null != mqttClient) { if (!mqttClient.isConnected()) { // 創建連接 try { System.out.println("創建連接"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } else { System.out.println("mqttClient為空"); } System.out.println(mqttClient.isConnected()); if (mqttClient.isConnected()) { try { //添加回調方法1 mqttClient.subscribe("topic/test1", 2, mqttRecieveCallback); //添加回調方法2 mqttClient.subscribe("topic/test2", 2, mqttTwoRecieveCallback); } catch (MqttException e) { e.printStackTrace(); } } } // 關閉連接 @PreDestroy public void closeConnect() { //關閉存儲方式 if (null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("memoryPersistence is null"); } // 關閉連接 if (null != mqttClient) { if (mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttClient is not connect"); } } else { System.out.println("mqttClient is null"); } } // 發布消息 public void publishMessage(String pubTopic, String message, int qos,Boolean retained) { if (null != mqttClient && mqttClient.isConnected()) { System.out.println("發布消息 " + mqttClient.isConnected()); System.out.println("id:" + mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); mqttMessage.setRetained(retained); MqttTopic topic = mqttClient.getTopic(pubTopic); if (null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if (!publish.isComplete()) { System.out.println("消息發布成功"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } else { reConnect(); } } // 重新連接 public void reConnect() { if (null != mqttClient) { if (!mqttClient.isConnected()) { if (null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttConnectOptions is null"); } } else { System.out.println("mqttClient is null or connect"); } } else { init(); } } // 訂閱主題 public void subTopic(String topic) { if (null != mqttClient && mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttClient is error"); } } // 清空主題 public void cleanTopic(String topic) { if (null != mqttClient && !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttClient is error"); } } }
配置參數說明:
- cleanSession :把配置里的 cleanSession 設為false,客戶端掉線后 服務器端不會清除session,當重連后可以接收之前訂閱主題的消息。當客戶端上線后繼續訂閱會接收到它離線的這段時間的消息(注意:clientId 是不能修改)
- Retained:如果PUBLISH消息的RETAIN標記位被設置為1,則稱該消息為“保留消息”(只會保存一條);Broker會存儲每個Topic的最后一條保留消息及其Qos,當訂閱該Topic的客戶端上線后,Broker需要將該消息投遞給它。這可以讓新訂閱的客戶端馬上得到發布方的最新的狀態值,而不必要等待。
保留消息的刪除
- 方式1:發送空消息體的保留消息;
- 方式2:發送最新的保留消息覆蓋之前的(推薦);
- setUserName: 設置用戶名
- setPassword: 設置密碼
- setCleanSession: 設置是否清除會話
- setKeepAliveInterval: 設置心跳間隔
- setConnectionTimeout: 設置連接超時時間
- setAutomaticReconnect: 設置是否自動重連
回調類一
MqttRecieveCallback.java
import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MqttRecieveCallback implements MqttCallback, IMqttMessageListener { @Autowired private MyMqttClient client; @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("Client 接收消息主題 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息內容 : " + new String(message.getPayload())); /** * 發送消息 */ client.publishMessage("topic/test2","2",2,false); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }
回調類2
MqttTwoRecieveCallback.java
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; @Component public class MqttTwoRecieveCallback implements MqttCallback, IMqttMessageListener { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client2 接收消息主題 : " + topic); System.out.println("Client2 接收消息Qos : " + message.getQos()); System.out.println("Client2 接收消息內容 : " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }
使用
@Autowired private MyMqttClient myMqttClient; myMqttClient.publishMessage("tra_topic",text,2,false);
如果出現報錯:MQTT(32202): 正在發布過多的消息
增加配置
mqttConnectOptions.setMaxInflight(1000);
如果報錯:
Action:
Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.
這是因為我們循環依賴了,可以自己改下代碼
或者增加配置
spring: main: allow-circular-references: true
SSL連接方式
<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on --> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <version>1.70</version> </dependency>
SSLUtils.java
import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openssl.PEMKeyPair; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileReader; import java.security.KeyPair; import java.security.KeyStore; import java.security.Security; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; public class SSLUtils { public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile, final String password) throws Exception { Security.addProvider(new BouncyCastleProvider()); // load CA certificate X509Certificate caCert = null; FileInputStream fis = new FileInputStream(caCrtFile); BufferedInputStream bis = new BufferedInputStream(fis); CertificateFactory cf = CertificateFactory.getInstance("X.509"); while (bis.available() > 0) { caCert = (X509Certificate) cf.generateCertificate(bis); } // load client certificate bis = new BufferedInputStream(new FileInputStream(crtFile)); X509Certificate cert = null; while (bis.available() > 0) { cert = (X509Certificate) cf.generateCertificate(bis); } // load client private key PEMParser pemParser = new PEMParser(new FileReader(keyFile)); Object object = pemParser.readObject(); JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); KeyPair key = converter.getKeyPair((PEMKeyPair) object); pemParser.close(); // CA certificate is used to authenticate server KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); caKs.load(null, null); caKs.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); tmf.init(caKs); // client key and certificates are sent to server so it can authenticate KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", cert); ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert}); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory .getDefaultAlgorithm()); kmf.init(ks, password.toCharArray()); // finally, create SSL socket factory SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return context.getSocketFactory(); } }
使用增加
// 設置 socket factory String caFilePath = "/cacert.pem"; String clientCrtFilePath = "/client.pem"; String clientKeyFilePath = "/client.key"; SSLSocketFactory socketFactory = null; try { socketFactory = SSLUtils.getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, ""); } catch (Exception e) { e.printStackTrace(); } mqttConnectOptions.setSocketFactory(socketFactory);
另一種方式(推薦):https://www.cnblogs.com/pxblog/p/17058417.html