前言
本文使用JAVA實現和AWS IOT傳遞MQTT消息。
1. AWS控制台,訂閱某個主題的消息。JAVA代碼向同一主題推送消息。可在AWS控制台查看推送過來的消息。
2. JAVA訂閱某個主題的消息。AWS控制台向同一主題推送消息。可在JAVA輸出AWS推送過來的消息。
即可實現雙向通信。只要訂閱的是同一個主題,均可實現消息的通信。
因為用到了AWS服務,故需要使用AWS相關配置。此處參見AWS官網。
記錄下本地代碼位置:D:\len\iotServer
展示效果
第一張圖展示AWS控制台訂閱sdk/test/java主題
第二張圖展示JAVA推送過來的消息
代碼實現
main方法中可通過注釋或打開【訂閱消息】或【推送消息】結合AWS控制台測試。
也可同時打開,消息會自己推送自己接收,JAVA控制台效果如下。
具體代碼
import com.amazonaws.services.iot.client.*; import java.util.UUID; // 推送、訂閱消息的主實現 public class ApplicationMain { private static AWSIotMqttClient awsIotClient; // 訂閱主題 private static final String TestTopic = "sdk/test/java"; private static final AWSIotQos TestTopicQos = AWSIotQos.QOS0; // AWS連接憑證-需要使用自己的配置 private static final String clientEndpoint = "XXX-ats.iot.cn-northwest-1.amazonaws.com.cn"; private static final String awsAccessKeyId = "XXX"; private static final String awsSecretAccessKey = "XXX"; // 事物ID-需保證唯一 public static String clientId = UUID.randomUUID().toString(); /** * 推送和訂閱的主方法 */ public static void main(String[] args) throws AWSIotException, InterruptedException { awsIotClient = initClient(); // 初始化設備連接 awsIotClient.connect(); // 訂閱消息 AWSIotTopic topic = new TopicListener(TestTopic, TestTopicQos); awsIotClient.subscribe(topic, true); // 推送消息 Thread publisherThread = new Thread(new PubisherThread(awsIotClient)); publisherThread.start(); publisherThread.join(); } /** * 初始化設備連接 * @return */ public static AWSIotMqttClient initClient() { if (awsAccessKeyId != null && awsSecretAccessKey != null) { awsIotClient = new AWSIotMqttClient(clientEndpoint, clientId, awsAccessKeyId, awsSecretAccessKey); } return awsIotClient; } /** * 循環推送數據 */ public static class PubisherThread implements Runnable { private final AWSIotMqttClient awsIotClient; public PubisherThread(AWSIotMqttClient awsIotClient) { this.awsIotClient = awsIotClient; } @Override public void run() { long counter = 1; while (true) { String payload = "測試:" + (counter++); AWSIotMessage message = new PublishListener(TestTopic, TestTopicQos, payload); try { awsIotClient.publish(message); } catch (AWSIotException e) { System.out.println("推送失敗:" + payload); } try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("推送中斷"); return; } } } } }
推送消息后的回調類
import com.amazonaws.services.iot.client.AWSIotMessage; import com.amazonaws.services.iot.client.AWSIotQos; /** * 推送消息后回調 */ public class PublishListener extends AWSIotMessage { public PublishListener(String topic, AWSIotQos qos, String payload) { super(topic, qos, payload); } @Override public void onSuccess() { System.out.println("推送成功回調:" + getStringPayload()); } @Override public void onFailure() { System.out.println("推送失敗回調:" + getStringPayload()); } @Override public void onTimeout() { System.out.println("推送超時回調:" + getStringPayload()); } }
訂閱主題后,接收內容類
import com.amazonaws.services.iot.client.AWSIotMessage; import com.amazonaws.services.iot.client.AWSIotQos; import com.amazonaws.services.iot.client.AWSIotTopic; /** * 訂閱主題后,接收內容 */ public class TopicListener extends AWSIotTopic { public TopicListener(String topic, AWSIotQos qos) { super(topic, qos); } @Override public void onMessage(AWSIotMessage message) { System.out.println("接收數據: " + message.getStringPayload()); } }