Activemq MQTT 簡單消息推送示例


Activemq MQTT 簡單消息推送示例


簡介

    簡單使用 MQTT 連接 Activemq 進行消息推送的示例代碼

編寫詳情

環境准備

    使用docker啟動Activemq,查看MQTT協議監聽端口是否正確,如下命令,顯示1883:

docker run -dit --name activemq -p 11616:61616 -p 8161:8161 -p 1883:1883 rmohr/activemq
docker exec -ti activemq cat /opt/activemq/conf/activemq.xml

訂閱者

    類似手機客戶端,接收消息推送,簡單打印收到的消息,代碼如下:

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;

public class Listener {

    public static void main(String[] args) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost("localhost", 1883);
        mqtt.setUserName("admin");
        mqtt.setPassword("admin");

        final CallbackConnection connection = mqtt.callbackConnection();
        connection.listener(new org.fusesource.mqtt.client.Listener() {

            @Override
            public void onConnected() {

            }

            @Override
            public void onDisconnected() {

            }

            @Override
            public void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable runnable) {
                String message = buffer.utf8().toString();
                System.out.println("Receive message : " + message);
            }

            @Override
            public void onFailure(Throwable throwable) {

            }
        });

        connection.connect(new Callback<Void>() {
            @Override
            public void onSuccess(Void aVoid) {
                Topic[] topics = {new Topic("mqttTest", QoS.AT_LEAST_ONCE)};
                connection.subscribe(topics, new Callback<byte[]>() {
                    @Override
                    public void onSuccess(byte[] bytes) {
                        System.out.println("subscribe success");
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        System.out.println("subscribe failed");
                    }
                });
            }

            @Override
            public void onFailure(Throwable throwable) {

            }
        });

        synchronized (Listener.class) {
            while (true) {
                Listener.class.wait();
            }
        }
    }
}

發布者

    進行消息的發布,代碼大致如下:

import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;

public class Publisher {

    public static void main(String[] args) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost("localhost", 1883);
        mqtt.setUserName("admin");
        mqtt.setPassword("admin");

        FutureConnection connection = mqtt.futureConnection();
        connection.connect().await();
        System.out.println("connect");

        int messageAmount = 10;
        UTF8Buffer topic = new UTF8Buffer("mqttTest");
        while (messageAmount > 0) {
            connection.publish(topic, new AsciiBuffer("test message" + messageAmount), QoS.AT_LEAST_ONCE, false);
            System.out.println("send message " + messageAmount);
            messageAmount -= 1;
        }

        connection.disconnect().await();
        System.out.println("disconnect");
    }
}

運行

    先啟動訂閱者,再啟動發布者,可以看到消息發送和接收

參考鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM