JAVA實現和AWS IOT訂閱、發布MQTT消息


前言

本文使用JAVA實現和AWS IOT傳遞MQTT消息。

1.  AWS控制台,訂閱某個主題的消息。JAVA代碼向同一主題推送消息。可在AWS控制台查看推送過來的消息。

2. JAVA訂閱某個主題的消息。AWS控制台向同一主題推送消息。可在JAVA輸出AWS推送過來的消息。

即可實現雙向通信。只要訂閱的是同一個主題,均可實現消息的通信。

因為用到了AWS服務,故需要使用AWS相關配置。此處參見AWS官網。

記錄下本地代碼位置:D:\len\iotServer

展示效果

第一張圖展示AWS控制台訂閱sdk/test/java主題

第二張圖展示JAVA推送過來的消息

 代碼實現

main方法中可通過注釋或打開【訂閱消息】或【推送消息】結合AWS控制台測試。

也可同時打開,消息會自己推送自己接收,JAVA控制台效果如下。

 具體代碼

import com.amazonaws.services.iot.client.*;
import java.util.UUID;

// 推送、訂閱消息的主實現
public class ApplicationMain {

    private static AWSIotMqttClient awsIotClient;

    // 訂閱主題
    private static final String TestTopic = "sdk/test/java";
    private static final AWSIotQos TestTopicQos = AWSIotQos.QOS0;

    // AWS連接憑證-需要使用自己的配置
    private static final String clientEndpoint = "XXX-ats.iot.cn-northwest-1.amazonaws.com.cn";
    private static final String awsAccessKeyId = "XXX";
    private static final String awsSecretAccessKey = "XXX";

    // 事物ID-需保證唯一
    public static String clientId = UUID.randomUUID().toString();

    /**
     * 推送和訂閱的主方法
     */
    public static void main(String[] args) throws AWSIotException, InterruptedException {
        awsIotClient = initClient(); // 初始化設備連接
        awsIotClient.connect();

        // 訂閱消息
        AWSIotTopic topic = new TopicListener(TestTopic, TestTopicQos);
        awsIotClient.subscribe(topic, true);

        // 推送消息
        Thread publisherThread = new Thread(new PubisherThread(awsIotClient));
        publisherThread.start();
        publisherThread.join();
    }

    /**
     * 初始化設備連接
     * @return
     */
    public static AWSIotMqttClient initClient() {
        if (awsAccessKeyId != null && awsSecretAccessKey != null) {
            awsIotClient = new AWSIotMqttClient(clientEndpoint, clientId, awsAccessKeyId, awsSecretAccessKey);
        }
        return awsIotClient;
    }

    /**
     * 循環推送數據
     */
    public static class PubisherThread implements Runnable {
        private final AWSIotMqttClient awsIotClient;

        public PubisherThread(AWSIotMqttClient awsIotClient) {
            this.awsIotClient = awsIotClient;
        }

        @Override
        public void run() {
            long counter = 1;

            while (true) {
                String payload = "測試:" + (counter++);
                AWSIotMessage message = new PublishListener(TestTopic, TestTopicQos, payload);
                try {
                    awsIotClient.publish(message);
                } catch (AWSIotException e) {
                    System.out.println("推送失敗:" + payload);
                }

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("推送中斷");
                    return;
                }
            }
        }
    }
}

 推送消息后的回調類

import com.amazonaws.services.iot.client.AWSIotMessage;
import com.amazonaws.services.iot.client.AWSIotQos;

/**
 * 推送消息后回調
 */
public class PublishListener extends AWSIotMessage {

    public PublishListener(String topic, AWSIotQos qos, String payload) {
        super(topic, qos, payload);
    }

    @Override
    public void onSuccess() {
        System.out.println("推送成功回調:" + getStringPayload());
    }

    @Override
    public void onFailure() {
        System.out.println("推送失敗回調:" + getStringPayload());
    }

    @Override
    public void onTimeout() {
        System.out.println("推送超時回調:" + getStringPayload());
    }

}

訂閱主題后,接收內容類

import com.amazonaws.services.iot.client.AWSIotMessage;
import com.amazonaws.services.iot.client.AWSIotQos;
import com.amazonaws.services.iot.client.AWSIotTopic;

/**
 * 訂閱主題后,接收內容
 */
public class TopicListener extends AWSIotTopic {

    public TopicListener(String topic, AWSIotQos qos) {
        super(topic, qos);
    }

    @Override
    public void onMessage(AWSIotMessage message) {
        System.out.println("接收數據: " + message.getStringPayload());
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM