Android Apollo MQTT入門


 

一、Apache Apollo服務器其實是一個消息中轉站

下載地址 http://activemq.apache.org/apollo/download.html

 

服務搭建方式,參看博客Android APP必備高級功能,消息推送之MQTT

1、命令行進入解壓后bin目錄(例:E:>cd E:\MQTT\apache-apollo-1.7.1\bin)。

2、輸入apollo create XXX(xxx為創建的服務器實例名稱,例:apollo create mybroker),之后會在bin目錄下創建名稱為XXX的文件夾。

  XXX文件夾下etc\apollo.xml文件下是配置服務器信息的文件。

  etc\users.properties文件包含連接MQTT服務器時用到的用戶名和密碼,默認為admin=password,即賬號為admin,密碼為password,可自行更改。

3、進入XXX/bin目錄,輸入apollo-broker.cmd run開啟服務器,看到如下界面代表搭建完成

 

4、添加Windows服務

進入XXX/bin目錄,如下圖:

 

 

二、Android

AndroidManifest

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />
        <!-- Mqtt Service -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
        <service android:name="com.zyp.mqtt.MQTTService"/>

依賴項

buildscript {
    repositories {
        jcenter()
    }
    dependencies {
        classpath 'com.android.tools.build:gradle:2.3.2'

        // NOTE: Do not place your application dependencies here; they belong
        // in the individual module build.gradle files
    }
}

allprojects {
    repositories {
        jcenter()
        mavenCentral()
        maven { url "https://repo.eclipse.org/content/repositories/paho-releases/" }
    }
}
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
    compile 'org.greenrobot:eventbus:3.0.0'

Android Service

public class MQTTService  extends Service {

    public static final String TAG = MQTTService.class.getSimpleName();

    private static MqttAndroidClient client;
    private MqttConnectOptions conOpt;

    private String host = "tcp://10.0.2.2:61613";private String userName = "admin";
    private String passWord = "password";
    private static String myTopic = "topic";
    private String clientId = "test";

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        init();
        return super.onStartCommand(intent, flags, startId);
    }

    public static void publish(String msg){
        String topic = myTopic;
        Integer qos = 0;
        Boolean retained = false;
        try {
            client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void init() {
        // 服務器地址(協議+地址+端口號)
        String uri = host;
        client = new MqttAndroidClient(this, uri, clientId);
        // 設置MQTT監聽並且接受消息
        client.setCallback(mqttCallback);

        conOpt = new MqttConnectOptions();
        // 清除緩存
        conOpt.setCleanSession(true);
        // 設置超時時間,單位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包發送間隔,單位:秒
        conOpt.setKeepAliveInterval(20);
        // 用戶名
        conOpt.setUserName(userName);
        // 密碼
        conOpt.setPassword(passWord.toCharArray());

        // last will message
        boolean doConnect = true;
        String message = "{\"terminal_uid\":\"" + clientId + "\"}";
        String topic = myTopic;
        Integer qos = 0;
        Boolean retained = false;
        if ((!message.equals("")) || (!topic.equals(""))) {
            // 最后的遺囑
            try {
                conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
            } catch (Exception e) {
                Log.i(TAG, "Exception Occured", e);
                doConnect = false;
                iMqttActionListener.onFailure(null, e);
            }
        }

        if (doConnect) {
            doClientConnection();
        }

    }

    @Override
    public void onDestroy() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
        super.onDestroy();
    }

    /** 連接MQTT服務器 */
    private void doClientConnection() {
        if (!client.isConnected() && isConnectIsNomarl()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }

    // MQTT是否連接成功
    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "連接成功 ");
            try {
                // 訂閱myTopic話題
                client.subscribe(myTopic,1);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            // 連接失敗,重連
        }
    };

    // MQTT監聽並且接受消息
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {

            String str1 = new String(message.getPayload());
            MQTTMessage msg = new MQTTMessage();
            msg.setMessage(str1);
            EventBus.getDefault().post(msg);
            String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
            Log.i(TAG, "messageArrived:" + str1);
            Log.i(TAG, str2);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {

        }

        @Override
        public void connectionLost(Throwable arg0) {
            // 失去連接,重連
        }
    };

    /** 判斷網絡是否連接 */
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.i(TAG, "MQTT當前網絡名稱:" + name);
            return true;
        } else {
            Log.i(TAG, "MQTT 沒有可用網絡");
            return false;
        }
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }
}

MainActivity 界面方法

public class MainActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        EventBus.getDefault().register(this);
        startService(new Intent(this, MQTTService.class));
        findViewById(R.id.publishBtn).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                MQTTService.publish("CSDN 一口仨饃");
            }
        });
    }

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void getMqttMessage(MQTTMessage mqttMessage){
        Log.i(MQTTService.TAG,"get message:"+mqttMessage.getMessage());
        Toast.makeText(this,mqttMessage.getMessage(),Toast.LENGTH_SHORT).show();
    }

    @Override
    protected void onDestroy() {
        EventBus.getDefault().unregister(this);
        super.onDestroy();
    }
}

 

三、Java服務端

pom文件

    <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.0</version>
</dependency>

Server 程序入口

package com.zyp.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Server {

    public static final String HOST = "tcp://localhost:61613"; 
     
    public static final String TOPIC = "topic"; 
    private static final String clientid ="zhaoyazhou_server";  
 
    private MqttClient client; 
    private MqttTopic topic; 
    private String userName = "admin"; 
    private String passWord = "password"; 
 
    private MqttMessage message; 
 
    public Server() throws MqttException { 
         //MemoryPersistence設置clientid的保存形式,默認為以內存保存 
        client = new MqttClient(HOST, clientid, new MemoryPersistence()); 
        connect(); 
    } 
     
    private void connect() { 
        MqttConnectOptions options = new MqttConnectOptions(); 
        options.setCleanSession(true); 
        options.setUserName(userName); 
        options.setPassword(passWord.toCharArray()); 
        // 設置超時時間 
        options.setConnectionTimeout(10); 
        // 設置會話心跳時間 
        options.setKeepAliveInterval(20); 
        try { 
               client.setCallback(new PushCallback()); 
               client.connect(options); 
               topic = client.getTopic(TOPIC); 
        } catch (Exception e) { 
               e.printStackTrace(); 
        } 
    } 
     
    public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{ 
        MqttDeliveryToken token = topic.publish(message); 
        token.waitForCompletion(); 
        System.out.println(token.isComplete()+"========"); 
    } 
 
    public static void main(String[] args) throws MqttException { 
        Server server =  new Server(); 
        server.message = new MqttMessage(); 
        server.message.setQos(1); 
        server.message.setRetained(true); 
        server.message.setPayload("Server測試MQTT推送消息".getBytes()); 
         server.publish(server.message); 
         System.out.println(server.message.isRetained()+"------ratained狀態"); 
    } 
 
}

回調函數 PushCallback

package com.zyp.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PushCallback  implements MqttCallback {

 
    @Override
    public void connectionLost(Throwable arg0) {
        // 連接丟失后,一般在這里面進行重連 
        System.out.println("連接斷開,可以做重連");
       
    }
    
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
         // publish后會執行到這里 
        System.out.println("deliveryComplete---------"+ token.isComplete());  
       
    }
    
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
         // subscribe后得到的消息會執行到這里面 
        System.out.println("訂閱的字符串:"+topic);
        System.out.println("消息內容:"+message.toString());
       
    } 
}  

 

四、調試

啟動Apollo服務

將Android App啟動,鏈接上服務器,如圖:

啟動服務端程序,發送信息,如圖:

手機端接收到信息,如圖:

 

 

 

參考博客:Android APP必備高級功能,消息推送之MQTT

參考博客:MQTT JAVA發送、訂閱、收集消息

參考博客:MQTT協議之 Apache Apollo服務

 

參考文章:MQTT Part 4 發布,訂閱和退訂

參考文章:MQTT基礎入門第四部分:MQTT 發布,訂閱以及退訂

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM