android 實現mqtt消息推送,以及不停斷線重連的問題解決


前段時間項目用到mqtt的消息推送,整理一下代碼,代碼的原型是網上找的,具體哪個地址已經忘記了。

代碼的實現是新建了一個MyMqttService,全部功能都在里面實現,包括連服務器,斷線重連,訂閱消息,處理消息,發布消息等基本操作。

首先添加依賴:

dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
然后編輯AndroidManifest.xml,先添加權限:

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
再注冊service:

<service android:name="org.eclipse.paho.android.service.MqttService" />
<service
    android:name=".service.MyMqttService"
    android:enabled="true"
    android:exported="true"/>
接着進入正文MyMqttService.java,功能見注釋吧:

 

package com.example.nan.mqtt.service;

import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.graphics.BitmapFactory;
import android.os.Bundle;
import android.os.IBinder;
import android.support.v4.app.NotificationCompat;
import android.util.Log;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject;

/**
* @author nan
*/
public class MyMqttService extends Service {

private static final String TAG = "nlgMqttService";
private static final String TOPIC_TO_QA = "/s2c/task_quality/";

private static final String publishTopic = "exampleAndroidPublishTopic";

private MqttAndroidClient mqttAndroidClient;
private NotificationManager mNotificationManager;


public MyMqttService() {
}

@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
throw new UnsupportedOperationException("Not yet implemented");
}

@Override
public void onCreate() {
super.onCreate();
Log.d(TAG, "MqttService onCreate executed");
//mqtt服務器的地址
final String serverUri = "tcp://192.168.10.10:1883";
//新建Client,以設備ID作為client ID
mqttAndroidClient = new MqttAndroidClient(MyMqttService.this, serverUri, getIMEI());
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
//連接成功
if (reconnect) {
Log.d(TAG, "connectComplete: " + serverURI);
// Because Clean Session is true, we need to re-subscribe
subscribeAllTopics();
} else {
Log.d(TAG, "connectComplete: " + serverURI);
}
}

@Override
public void connectionLost(Throwable cause) {
//連接斷開
Log.d(TAG, "connectionLost: connection was lost");
}

@Override
public void messageArrived(String topic, MqttMessage message) {
//訂閱的消息送達,推送notify
String payload = new String(message.getPayload());
Log.d(TAG, "Topic: " + topic + " ==> Payload: " + payload);
if(mNotificationManager == null) {
mNotificationManager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
}
int roleId = SinSimApp.getApp().getRole();
Gson gson = new Gson();
ServerToClientMsg msg = gson.fromJson(payload, new TypeToken<ServerToClientMsg>(){}.getType());
if(msg != null) {
    //接受消息
    if(topic != null) {
if(topic.equals(TOPIC_TO_QA)) {
Intent intent = new Intent(MyMqttService.this, ProcessToCheckoutActivity.class);
PendingIntent pi = PendingIntent.getActivity(MyMqttService.this, 0, intent, 0);
NotificationCompat.Builder builder = new NotificationCompat.Builder(MyMqttService.this, TOPIC_TO_QA);
Notification notify = builder.setSmallIcon(R.mipmap.to_quality)
.setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.to_quality))
.setDefaults(Notification.DEFAULT_SOUND|Notification.DEFAULT_VIBRATE)//響鈴震動
.setContentTitle("快遞來了")
.setAutoCancel(true)
.setContentIntent(pi)
.setVisibility(Notification.VISIBILITY_PUBLIC)
.setContentText("你的快遞單號:" + msg.getOrderNum())
//不設置此項不會懸掛,false 不會出現懸掛
.build();
mNotificationManager.notify(2,notify);
}
}
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//即服務器成功delivery消息
}
});
//新建連接設置
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//斷開后,是否自動連接
mqttConnectOptions.setAutomaticReconnect(true);
//是否清空客戶端的連接記錄。若為true,則斷開后,broker將自動清除該客戶端連接信息
mqttConnectOptions.setCleanSession(false);
//設置超時時間,單位為秒
//mqttConnectOptions.setConnectionTimeout(2);
//心跳時間,單位為秒。即多長時間確認一次Client端是否在線
//mqttConnectOptions.setKeepAliveInterval(2);
//允許同時發送幾條消息(未收到broker確認信息)
//mqttConnectOptions.setMaxInflight(10);
//選擇MQTT版本
mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
try {
Log.d(TAG, "onCreate: Connecting to " + serverUri);
    //開始連接
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "onSuccess: Success to connect to " + serverUri);
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
    //成功連接以后開始訂閱
subscribeAllTopics();
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//連接失敗
    Log.d(TAG, "onFailure: Failed to connect to " + serverUri);
exception.printStackTrace();
}
});
} catch (MqttException ex) {
ex.printStackTrace();
}

//service綁定notification
Intent intent = new Intent(this, SplashActivity.class);
intent.putExtra(SinSimApp.FROM_NOTIFICATION, true);
//這邊設置“FLAG_UPDATE_CURRENT”是為了讓后面的Activity接收pendingIntent中Extra的數據
PendingIntent pi = PendingIntent.getActivity(this, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT);
Notification notification = new NotificationCompat.Builder(this)
.setContentTitle("mqtt快遞")
.setContentText("mqtt快遞管理系統")
.setWhen(System.currentTimeMillis())
.setSmallIcon(R.mipmap.ic_launcher)
.setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher))
.setContentIntent(pi)
.build();
startForeground(1, notification);
}

//訂閱所有消息
    private void subscribeAllTopics() {
subscribeToTopic(TOPIC_TO_QA);
}

/**
* 訂閱消息
*/
public void subscribeToTopic(String subscriptionTopic) {
try {
mqttAndroidClient.subscribe(subscriptionTopic, 2, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "onSuccess: Success to Subscribed!");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d(TAG, "onFailure: Failed to subscribe");
}
});
} catch (MqttException ex) {
Log.d(TAG, "subscribeToTopic: Exception whilst subscribing");
ex.printStackTrace();
}
}

/**
* 發布消息
*/
public void publishMessage(String msg) {
try {
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
mqttAndroidClient.publish(publishTopic, message);
Log.d(TAG, "publishMessage: Message Published: " + msg);
} catch (MqttException e) {
Log.d(TAG, "publishMessage: Error Publishing: " + e.getMessage());
e.printStackTrace();
}
}

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Log.d(TAG, "MqttService onStartCommand executed");
return super.onStartCommand(intent, flags, startId);
}

@Override
public void onDestroy() {
super.onDestroy();
try {
if(mqttAndroidClient!=null){
//服務退出時client斷開連接
mqttAndroidClient.disconnect();
}
} catch (MqttException e) {
e.printStackTrace();
}
Log.d(TAG, "MqttService onDestroy executed");
}
}


調試過程中出現過一個小插曲:服務在有些時候會不停的斷線重連。斷線重連的設置是開了的:
mqttConnectOptions.setAutomaticReconnect(true);
但是斷開的原因找不到,當時還沒有重寫onDestory方法,就算退出應用也還在重連,一度懷疑service的開啟與關閉的問題,還系統的重新學習了一下service的使用,學完以后也沒有啥進展,然后重學mqtt的調用流程發揮了效果,在onDestory里面調用了disconnect()方法,完了以后在退出應用以后就不會重連了,但是重新開還是繼續不停重連。到了晚上,奇怪的事情發生了,當夜深人靜,獨自加班的時候,居然再也復現不了了。為什么呢,心想可能平時給八阿哥上的香起了效果,那就開心的回家吧。下班的路上雖然開心的吃了塊雞排,但心里的結還是沒有打開,為什么呢,是道德的淪喪還是人性的扭曲,讓我獨自加班還不餓給我復現問題。突然靈光一現,想到今天特么加班就我一個人,也就是一個人玩就是好的,玩的人多就會有問題,那么答案就來了,跟唯一性有關的只有clientID了,特么老子把clientID設置成用戶id了,測試用的用戶id就注冊了3個,好幾個人來回切着用,不出問題才怪。於是我默默的把clientID改成了設備id,困擾2天的問題就這么解決了。
---------------------
作者:邦德總管
來源:CSDN
原文:https://blog.csdn.net/a5nan/article/details/79975488
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM