MQTT 消息 發布 訂閱



 

當連接向一個mqtt服務器時,clientId必須是唯一的。設置一樣,導致client.setCallback總是走到 connectionLost回調。報connection reset。調查一天才發現是clientid重復導致。

client = new MqttAsyncClient(serverURIString, "client-id");

 


 

clientId是用來保存會話信息。

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);

當服務器將message發布向所有訂閱過的客戶端,就會清除這個message。如果當前客戶端不在線,等它連接時發送。

 

session與client之間的關系是怎樣的?

這樣的,比如你一個板子,作為客戶端,發起mqtt的連接請求connect到mqtt服務器,比如說就是emqtt服務吧,emqtt服務端收到這個板子的連接請求之后,在tcp層上會和板子建立一個tcp的連接,在emqtt內部,會產生一個進程,和這個板子做數據通訊,同時還會產生一個進程,叫session,這個sessoin是專門管理這個板子訂閱的主題,其它板子如果發布了這個板子感興趣的主題的時候,也會發到這個板子對應的這個session里面,如果這個session收到訂閱的主題之后,發現對用的client還活着,就通過這個client把數據經過tcp發到這個板子上,如果發現client已經沒有了,就是說板子和服務端斷掉了,那么session就會把收到的訂閱的主題,先保存在session里面,下次板子連接上了,而且cleansession=false,那么這個session就不會清除,在這次連接時,就會把以前收到的訂閱消息,發給板子,大概就是這個意思。


 

參考:

http://www.blogjava.net/yongboy/archive/2014/02/15/409893.html

http://www.cnblogs.com/znlgis/p/4930990.html

http://blog.csdn.net/ljf10010/article/details/51424506

paho客戶端示例

 

https://github.com/eclipse/paho.mqtt.java/tree/master/org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test

http://www.eclipse.org/paho/files/javadoc/index.html api文檔

ibm客戶端paho示例:

http://www.programcreek.com/java-api-examples/index.php?source_dir=streamsx.messaging-master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttAsyncClientWrapper.java

hivemq的mqtt詳解:http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

 

 http://blog.csdn.net/wuyinxian/article/details/38826259

MQTT Protocol Manual(Apollo中MQTT協議解析)

 
 
 

 
 MQTT協議
Apollo允許客戶端通過開放的MQTT協議連接。該協議主要是用在資源有限的驅動上,以及網絡不穩定的情況下使用,是一個訂閱、發布模型。這種驅動通常不適用類似http,stomp這類基於文本,或者類似openfire,AMQP等傳統二進制協議。MQTT是一個簡介的二進制協議,適用這類驅動資源受限,而且是不穩定的網絡條件下。
之前的穩定發布版本中,MQTT是作為一個Apollo的一個插件提供的。但是現在,這個插件已經變為開發項目的一部分。MQTT在Apollo中已經不需要其他配置文件或者是第三方插件支持了。
MQTT是一個線路層的協議,任何實現該協議的客戶端都可以連接到Apollo。當然也可以整合其他MQTT兼容的消息代理中。
更多有關MQTT協議內容,參考 the MQTT Specification
 MQTT協議配置
為了開始使用MQTT協議,首先使用MQTT3.1協議的客戶端,連接到Apollo正在監聽端口。Apollo會做協議檢測,而且自動識別MQTT連接,而且將連接作為MQTT協議處理。你不必要為MQTT協議打開一個端口(STomp,Openfire,AMQP等都是自動識別)。如果你一定指定連接的協議,有下面兩種方式:你可以選擇不用協議識別,而是為MQTT指定連接:
<connector id="tcp" bind="tcp://0.0.0.0:61613" protocol="mqtt"/>
或者你可以限制哪種協議可以被自動識別。通過下面的<detece>配置方式:
<connector id="tcp" bind="tcp://0.0.0.0:61613">
  <detect protocols="mqtt openwire" />
</connector>
<detect> 下protocols 對應的參數通過空格來隔開支持的通信協議。如果只支持一種協議,就不要空格,默認情況下對任何協議生效。
如果你想調整MQTT默認設置,在apollo.xml文件中有一個<connector> 元素,通過MQTT參數配置:
<connector id="tcp" bind="tcp://0.0.0.0:61613">
  <mqtt max_message_length="1000" />
</connector>
MQTT元素支持下面幾個參數:
  • max_message_length : The size (in bytes) of the largest message that can be sent to the broker. Defaults to 100MB(broker能接受的最大消息量:默認是100M)
  • protocol_filters : A filter which can filter frames being sent/received to and from a client. It can modify the frame or even drop it.(一個控制發送和接收,Client的過濾器框架。可以修改,刪除這個框架)
  • die_delay : How long after a connection is deemed to be “dead” before the connection actually closes; default: 5000ms(在實際斷開連接之前,會有默認5000ms的時間被認為連接已經dead)
mqtt 配置元素也可以用來控制目的消息頭的解析。下面是支持的參數:
  • queue_prefix : a tag used to identify destination types; default: null(用來確認目的地類型)
  • path_separator : used to separate segments in a destination name; default: /(用來分割目的地名稱)
  • any_child_wildcard : indicate all child-level destinations that match the wildcard; default: +(識別子目錄)
  • any_descendant_wildcard : indicate destinations that match the wildcard recursively; default:#(目標地址通配符)
  • regex_wildcard_start : pattern used to identify the start of a regex(表示正則表達開始)
  • regex_wildcard_end : pattern used to identify the end of a regex(表示正則表達結束)
  • part_pattern : allows you to specify a regex that constrains the naming of topics. (你可以指定正則表達規則)default: [ a-zA-Z0-9\_\-\%\~\:\(\)]+
 Client 可用函數庫
Apollo 支持MQTT3.1 協議,下面是可用的Clients:
如果要找到新支持的Clients ,可以檢索: the MQTT website for its software
在目錄example 目錄下,你可以找到一些例子,實現了與broker之間收發。
 connecting
為了確保broker配置文件的安全,所以只允許一個admin 用戶連接,默認的用戶名和密碼是:admin ,password.
Mqtt 客戶端不能specify 虛擬主機(更多請看:see  the section on Virtual Hosts in the user guide),以至於默認情況下虛擬主機已經被使用了。通常第一虛擬主機定義在apollo.xml文件中。
 Destination 類型
MQTT協議是訂閱,發布協議,是不允許真正的利用隊列點對點的消息收發。因此Apollo僅允許利用主題,還進行MQTT消息發送。訂閱的概念和持久的主題訂閱 和其他協議提到的有些類似,同時也被MQTT CONNECT 框架的clean session屬性控制。
 Clean Sessions
但一個Client 發送一個連接,這個連接中clean session 被設置為false,那么之前連接中有相同Client_id 的session 將會被重復使用。這就意味着Client斷開了,訂閱依然能收到消息。這就等同與同Apollo建立一個長訂閱。
如果 clean session 設置為true ,那么新session就開始了,其他的session會慢慢消失,刪除。這就是Apollo中定義的普通的主題訂閱。
 Topic Retained Messages
如果消息被發布的同時retain 標記被設置,消息將被主題記住,以至於新的訂閱到達,最近的retain 消息會被發送到訂閱者。比如說:你想發布一個參數,而且你想讓最新的這個參數發布到總是可用的訂閱了這個主題的客戶端上,你就設置在PUBLISH 框架上設置retain 標簽。
注意:retained 消息 不會被設置成retained 在 QoS設置為零的broker 重啟過程中。

Last Will and Testament Message

當Client第一次連接的時候,有一個will 消息和一個更QoS相關的消息會跟你有關。will消息是一個基礎消息,這個基礎消息只有在連接異常或者是掉線的時候才會被發送。一般用在你有一個設備,當他們掉了的時候,你需要知道。所以如果一個醫療Client從broker掉線,will消息將會作為一個鬧鍾主題發送,而且會被系統作為高優先級提醒。

Reliable Messaging

MQTT協議允許Client 發布消息的時候指定Qos參數:
  • At Most Once (QoS=0)
  • At Least Once (QoS=1)
  • Exactly Once (QoS=2)
 最多一次
這個設置時推送消息給Client,可靠性最低的一種。如果設置Qos=0,那broker就不會返回結果碼,告訴你他收到消息了,也不會在失敗后嘗試重發。這有點像不可靠消息,如JMS。
 至少一次
該設置會確保消息會被至少一次推送到Client。如果推送設置為至少推送一次,Apollo會返回一個回調函數,確保代理已經收到消息,而且確保會確保推送該消息。如果Client 將發布了一個Qos=1的消息,如果在指定的時間內沒有收到回復,Client會希望重新發布這個消息。所以可能存在這種情況:代理收到一個需要推送的消息,然后又收到一個消息推送到同一個Client。所以如果傳輸過程中PUBACK丟失,Client會重新發送,而且不會去檢測是否是重發,broker就將消息發送到訂閱主題中。
 恰好一次
該設置是可靠等級最高的。他會確保發布者不僅僅會推送,而且不會像Qos=1 那樣,會被接收兩次。當然這個設置會增加網絡的負載。當一個消息被發布出去的時候,broker會保存該消息的id,而且會利用任何長連接,堅持要把該消息推送給目標地址。如果Client收到PUBREC 標志,那就表明broker已經收到消息了。 這個時候broker會期待Client發送一個PUBREL 來清除session 中消息id,broker如果發送成功就會發送一個PUBCOMP通知Client。

Wildcard Subscriptions

通配用在主題的目標地址中。這能實現一個主題發送到多個用戶,或者多層用戶中。
  • / is used to separate names in a path(分割路徑)
  • + is used to match any name in a path(通配地址任何字符)
  • # is used to recursively match path names(遞歸通配)
比如通配可能這樣來用:
  • PRICE/# : Any price for any product on any exchange(任何交易中任何產品的價格)
  • PRICE/STOCK/# : Any price for a stock on any exchange(任何交易中的股票價格)
  • PRICE/STOCK/NASDAQ/+ : Any stock price on NASDAQ(納斯達克的任何股票價格)
  • PRICE/STOCK/+/IBM : Any IBM stock price on any exchange(任何交易中IBM股票價格)

Keep Alive

Apollo只有在Client指定了CONNECT的KeepAlive 值的時候,才會設置保持連接、心跳檢測。如果one Client指定了keepalive,apollo 將會使用1.5*keepalive值。這個在MQTT中有說明。

Destination Name Restrictions

路徑名稱限制了使用(a-z, A-Z, 0-9, _, - %, ~, :, ' ', '(', ')' ,. )字符,通配符(*)在復雜的分隔符中。而且確保使用utf-8來編譯你的URL。
  1 package com.xxx.mqtt;
  2 
  3 import java.net.URI;
  4 
  5 import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
  6 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  7 import org.eclipse.paho.client.mqttv3.IMqttToken;
  8 import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
  9 import org.eclipse.paho.client.mqttv3.MqttCallback;
 10 import org.eclipse.paho.client.mqttv3.MqttClient;
 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 12 import org.eclipse.paho.client.mqttv3.MqttException;
 13 import org.eclipse.paho.client.mqttv3.MqttMessage;
 14 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 15 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 16 
 17 
 18 public class MyMqttClient implements MqttCallback {
 19 
 20     private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
 21     private static final String topic = "mytopic";
 22 
 23     private String HOST = "127.0.0.1";
 24     private int PORT = 1883;
 25     private String USERNAME = "user";
 26     private String PASSWORD = "password";
 27     private String serverURIString = "tcp://" + HOST + ":" + PORT;
 28     
 29     String clientId = "client-1";
 30 
 31     MqttAsyncClient client;
 32     // Tokens
 33     IMqttToken connectToken;
 34     IMqttDeliveryToken pubToken;
 35 
 36 
 37     public static void main(String[] args) {
 38         MyMqttClient app = new MyMqttClient();
 39         app.asyncClient();
 40         try {
 41             Thread.sleep(20000);
 42             app.disconnect();
 43         } catch (Exception e) {
 44             e.printStackTrace();
 45         }
 46         System.out.println("end");
 47     }
 48 
 49     public void blockingClient() {
 50         
 51         try {
 52             MqttClient sampleClient = new MqttClient(serverURIString, clientId);
 53             MqttConnectOptions connOpts = new MqttConnectOptions();
 54             connOpts.setCleanSession(true);
 55             connOpts.setUserName(USERNAME);
 56             connOpts.setPassword(PASSWORD.toCharArray());
 57             System.out.println("Connecting to broker: " + serverURIString);
 58             sampleClient.connect(connOpts);
 59             sampleClient.subscribe("#", 1);
 60             System.out.println("Connected");
 61 //                System.out.println("Publish message: " + content);
 62 //                MqttMessage message = new MqttMessage(content.getBytes());
 63 //                message.setQos(qos);
 64             sampleClient.setCallback(this);
 65 //                sampleClient.publish(topic, message);
 66 //                System.out.println("Message published");
 67             try {
 68                 Thread.sleep(10000000);
 69                 System.out.println("Disconnected");
 70                 sampleClient.disconnect();
 71             } catch (Exception e) {
 72                 e.printStackTrace();
 73             }
 74 
 75         } catch (MqttException me) {
 76             System.out.println("reason " + me.getReasonCode());
 77             System.out.println("msg " + me.getMessage());
 78             System.out.println("loc " + me.getLocalizedMessage());
 79             System.out.println("cause " + me.getCause());
 80             System.out.println("except " + me);
 81             me.printStackTrace();
 82         }
 83     }
 84 
 85     public void asyncClient() {
 86         info(" MQTT init start.");
 87 
 88         // Tokens
 89         IMqttToken connectToken;
 90         IMqttDeliveryToken pubToken;
 91 
 92         // Client Options
 93         MqttConnectOptions options = new MqttConnectOptions();
 94         options.setCleanSession(false);
 95         options.setAutomaticReconnect(true);
 96 
 97         options.setUserName(USERNAME);
 98         options.setPassword(PASSWORD.toCharArray());
 99 
100         try {
101             client = new MqttAsyncClient(serverURIString, clientId);
102 
103             DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
104             disconnectedOpts.setBufferEnabled(true);
105             client.setBufferOpts(disconnectedOpts);
106 
107             connectToken = client.connect(options);
108             connectToken.waitForCompletion();//異步變成了同步。可以用IMqttCallbackListen..在connect時候設置回調。
109             boolean isConnected = client.isConnected();
110             info("Connection isConnected: " + isConnected);
111 
112             if (connectToken.isComplete() && connectToken.getException() == null && client.isConnected()) {
113                 info("[Connect:] Success: "); //$NON-NLS-1$ //$NON-NLS-2$ 
114                 client.setCallback(this);
115 
116             } else {
117                 info("[Connect:] faild: "); //$NON-NLS-1$ //$NON-NLS-2$ 
118             }
119 
120 //             MqttTopic topic = client.getTopic(topic); 
121 //             topic.
122             //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息    
123 //                options.setWill(topic, "close".getBytes(), 2, true);  
124 
125             IMqttToken subToken = client.subscribe("#", 1);
126 
127             subToken.waitForCompletion(1000);
128 
129             if (subToken.isComplete()) {
130                 info("subToken  complete.");
131                 if (subToken.getException() != null) {
132                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
133                 }
134             } else {
135                 info("subToken not complete.");
136                 if (subToken.getException() != null) {
137                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
138                 }
139             }
140 
141             info("init end");
142 
143         } catch (MqttException e) {
144             // TODO Auto-generated catch block
145             e.printStackTrace();
146         }
147 
148     }
149 
150 //String clientId, String topic, String message
151     public void send() {
152         String topic;
153         String message;
154         info("===Send Message start.===");
155         message = "Hello, boy.";
156         
157 
158         boolean isConnected = client.isConnected();
159         if (!isConnected) {
160             //no need. it will auto reconnect and send.
161         }
162 
163         // Publish Message
164         try {
165             pubToken = client.publish(topic, new MqttMessage(message.getBytes()));
166 
167             info("Publish attempted: isComplete:" + pubToken.isComplete());
168 
169             pubToken.waitForCompletion();
170         } catch (MqttPersistenceException e) {
171             // TODO Auto-generated catch block
172             e.printStackTrace();
173         } catch (MqttException e) {
174             // TODO Auto-generated catch block
175             e.printStackTrace();
176         }
177 
178         // Check that Message has been delivered
179         info("Message Delivered: " + pubToken.isComplete());
180         info("=== send end.====");
181     }
182 
183     void disconnect() {
184         IMqttToken disconnectToken;
185         try {
186             disconnectToken = client.disconnect();
187             disconnectToken.waitForCompletion();
188             client.close();
189         } catch (MqttException e) {
190             // TODO Auto-generated catch block
191             e.printStackTrace();
192         }
193         client = null;
194     }
195 
196     void info(String s) {
197         System.out.println(s);
198     }
199 
200     public void connectionLost(Throwable thrwbl) {
201         // TODO Auto-generated method stub
202         info("connectionLost");
203 
204         info("MQTT is disconnected from topic: {}. Message: {}. Cause: {}" + topic + thrwbl.getMessage() + thrwbl.getCause().getMessage());
205         thrwbl.printStackTrace();
206 
207     }
208 
209     public void deliveryComplete(IMqttDeliveryToken arg0) {
210         // TODO Auto-generated method stub
211         info("deliveryComplete");
212 
213     }
214 
215     public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
216         // TODO Auto-generated method stub
217         String message = new String(arg1.getPayload());
218         String topic = arg0;
219 
220         info("xxx Receive : topic=" + topic + "; message=" + message);
221 
222     }
223 }
View Code
 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4 
 5     <groupId>com.italktv.mqtt.client</groupId>
 6     <artifactId>mqttclient</artifactId>
 7     <version>0.0.1-SNAPSHOT</version>
 8     <packaging>jar</packaging>
 9 
10     <name>mqttclient</name>
11     <url>http://maven.apache.org</url>
12 
13     <properties>
14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15     </properties>
16 
17     <dependencies>
18         <dependency>
19             <groupId>junit</groupId>
20             <artifactId>junit</artifactId>
21             <version>3.8.1</version>
22             <scope>test</scope>
23         </dependency>
24 
25 
26         <dependency>
27             <groupId>org.eclipse.paho</groupId>
28             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
29             <version>1.1.0</version>
30         </dependency>
31 
32     </dependencies>
33 </project>
View Code

上面是maven管理項目的pom.xml


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM