Android MQTT的發布與訂閱


一、MQTT介紹

鏈接1(菜鳥教程):https://www.runoob.com/w3cnote/mqtt-intro.html

連接2(MQTT中文網):http://mqtt.p2hp.com/

連接3(Android開發之Mqtt的使用):https://blog.csdn.net/asjqkkkk/article/details/80714234

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議)。一種基於發布/訂閱(publish/subscribe)模式的“輕量級”通訊協議。構建於TCP/IP協議上,由IBM在1999年發布。

二、程序示例

  1 public class MqttManager {
  2 
  3     private static boolean initFirst = true;//是否第一次初始化mqtt標識符
  4     private static String host = "tcp://47.106.172.221:8081";
  5     private static String userName; //mqtt用戶名
  6     private static String passWord; //mqtt登陸密碼
  7     private static MqttManager manager;
  8     private static MqttClient mqttClient;
  9     private static MqttConnectOptions options;
 10     private static String topic;//訂閱的主題
 11     private static String clientId; //客戶端id
 12 
 13     private static Handler handler = new Handler() {
 14         @Override
 15         public void handleMessage(Message msg) {
 16             super.handleMessage(msg);
 17             if (msg.what == 1) {
 18                 KLog.d(msg.obj);
 19                 EventBus.getDefault()
 20                         .post(new MessageEventBean(AppConstants.MQTT_EVENT_TYPE, (String) msg.obj));
 21             } else if (msg.what == 2) {
 22                 KLog.d("連接成功");
 23                 try {
 24                     KLog.d("訂閱的主題:" + topic);
 25                     mqttClient.subscribe(topic, 0);
 26 
 27                 } catch (Exception e) {
 28                     e.printStackTrace();
 29                 }
 30             } else if (msg.what == 3) {
 31                 KLog.d("連接失敗,系統正在重連");
 32             }
 33         }
 34     };
 35 
 36     private MqttManager() {
 37 
 38     }
 39     private static MqttCallback myMqttCallback = new MqttCallback(){
 40 
 41         @Override
 42         public void messageArrived(String topic, MqttMessage message){
 43             //subscribe后得到的消息會執行到這里面
 44             KLog.d("messageArrived topic:"+topic);
 45             Message msg = new Message();
 46             msg.what = 1;
 47             msg.obj = message.toString();
 48             handler.sendMessage(msg);
 49         }
 50         @Override
 51         public void connectionLost(Throwable cause) {
 52             KLog.d("connectionLost cause = "+cause);
 53             //連接丟失后,一般在這里面進行重連
 54                 try{
 55                     KLog.d("mqtt重連");
 56                     manager.startReconnect();
 57                 }catch (Exception e){
 58                     KLog.d("Exception = "+ e);
 59                     e.printStackTrace();
 60                 }
 61         }
 62 
 63         @Override
 64         public void deliveryComplete(IMqttDeliveryToken token) {
 65             //publish后會執行到這里
 66             KLog.d("deliveryComplete");
 67         }
 68     };
 69     private ScheduledExecutorService scheduler;
 70 
 71     public static MqttManager getInstance() {
 72         if (manager == null) {
 73             manager = new MqttManager();
 74         }
 75         return manager;
 76     }
 77 
 78     public void initConnection() {
 79         if (initFirst){
 80             KLog.d("第一次調用initConnection");
 81             try {
 82                 clientId = Preferences.getUserAccount() + System.currentTimeMillis();//客戶端標識符(本機mac地址+當前時間ms)
 83                 userName = Preferences.getUserAccount();//用戶名
 84                 passWord = Preferences.getUserToken();//密碼
 85                 topic = userName;
 86                 //host為主機名;clientid即連接MQTT的客戶端ID,是客戶端的唯一標識符;MemoryPersistence設置clientid的保存形式,默認為以內存保存
 87                 mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
 88                 //MQTT的連接設置
 89                 options = new MqttConnectOptions();
 90                 //設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
 91                 options.setCleanSession(true);
 92                 //斷開后,是否自動連接
 93                 options.setAutomaticReconnect(true);
 94                 //設置連接的用戶名
 95                 options.setUserName(userName);
 96                 //設置連接的密碼
 97                 options.setPassword(passWord.toCharArray());
 98                 // 設置超時時間 單位為秒
 99                 options.setConnectionTimeout(10);
100                 // 設置會話心跳時間 單位為秒 服務器會每隔1.5*(20)秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
101                 options.setKeepAliveInterval(20);
102                 //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
103 //            options.setWill(topic,"close".getBytes(),2,true);
104                 //設置回調
105                 mqttClient.setCallback(myMqttCallback);
106                 KLog.d("clientId: "+clientId +", userName: "+userName+", passWord: "+passWord+", topic: "+topic);
107                 //設置標識符狀態
108                 initFirst = false;
109                 //mqtt第一次連接
110                 manager.startReconnect();
111             } catch (Exception e) {
112                 KLog.d("initConnection Exception: " + e);
113                 e.printStackTrace();
114             }
115         }else {
116             KLog.d("網絡重連后調用initConnection");
117             manager.startReconnect();
118         }
119     }
120 
121     public  void startReconnect() {
122 
123         if (NetworkUtils.isConnected()){
124 
125             if (!mqttClient.isConnected()) {
126                 //重新連接
127                 connect();
128                 KLog.d("mqtt連接結束");
129             }else {
130                 KLog.d("mqttClient.isConnected");
131             }
132 //            scheduler = Executors.newSingleThreadScheduledExecutor();
133 //            scheduler.scheduleAtFixedRate(new Runnable() {
134 //                @Override
135 //                public void run() {
136 //
137 //                    if (!mqttClient.isConnected()) {
138 //                        connect();
139 //                        KLog.d("mqtt連接結束");
140 //                    }
141 //                }
142 //            }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
143         }else {
144             KLog.d("網絡不可用");
145 //            scheduler.shutdown();
146         }
147 
148     }
149 
150     public void sendMsg(String msg) {
151         KLog.d("sendMsg");
152         if (mqttClient != null && mqttClient.isConnected()) {
153             try {
154                 KLog.d("發送的主題:" + Preferences.getUserAccount());
155                 String topic = Preferences.getUserAccount();
156                 KLog.d(topic);
157                 byte[] msgBytes = msg.getBytes();
158                 KLog.d("0000");
159                 mqttClient.publish(topic, msgBytes, 0, false);
160                 KLog.d("11111111111111111");
161             } catch (MqttException e) {
162                 KLog.d(e);
163             }
164         }
165     }
166 
167     //發布的主題設為pubTopic = "owh" + Preferences.getUserAccount();
168     //發布主題(發布主題和訂閱主題應設為不同值)
169     public void publish(String topicName, String payload) {
170         if (mqttClient != null && mqttClient.isConnected()) {
171             // 創建和配置一個消息
172             MqttMessage message = new MqttMessage(payload.getBytes());
173             message.setPayload(payload.getBytes());
174             message.setQos(0);
175             try {
176                 KLog.d("1111");
177                 mqttClient.publish(topicName, message);
178                 KLog.d("2222");
179             } catch (MqttException e) {
180                 KLog.d("publish : " + e.toString());
181             }
182         }
183     }
184 
185     private void connect() {
186 
187         ThreadPoolManager.getInstance().execute(new Runnable() {
188             @Override
189             public void run() {
190                 try {
191                     mqttClient.connect(options);
192                     Message msg = Message.obtain();
193                     msg.what = 2;
194                     handler.sendMessage(msg);
195                 } catch (Exception e) {
196                     e.printStackTrace();
197                     Message msg = Message.obtain();
198                     msg.what = 3;
199                     handler.sendMessage(msg);
200                 }
201             }
202         });
203 
204 //        new Thread(new Runnable() {
205 //
206 //            @Override
207 //            public void run() {
208 //                try {
209 //                    mqttClient.connect(options);
210 //                    Message msg = new Message();
211 //                    msg.what = 2;
212 //                    handler.sendMessage(msg);
213 //                } catch (Exception e) {
214 //                    e.printStackTrace();
215 //                    Message msg = new Message();
216 //                    msg.what = 3;
217 //                    handler.sendMessage(msg);
218 //                }
219 //            }
220 //        }).start();
221     }
222 
223     //斷開連接
224     public static void mqttDisconnect(){
225         if(mqttClient !=null && mqttClient.isConnected()){
226             try{
227                 mqttClient.disconnect();
228             }catch (MqttException e){
229                 KLog.d("mqtt disconnect error");
230                 e.printStackTrace();
231             }
232         }
233     }
234 
235 }

 

三、注意事項

1、MQTT的客戶端id(clientId)須唯一。在此項目中clientId = 本機mac地址 + 當前時間(ms)。

2、一個客戶端的一個MQTT連接最好只new一個對象,避免一台設備產生多個客戶端賬號。

當多個發布(/訂閱)的clientId相同時,會發生Mqtt反復重連的現象,無法正常發送或接收消息。

當多個發布(/訂閱)的clientId不同時,會造成一台設備多個Mqtt賬號同時在線,占用了多余的服務器資源。

3、一個客戶端的發布Topic和訂閱Topic不應相同。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM