rabbitmq作為mqtt服務器實現websocket消息推送給瀏覽器


rabbitmq的RabbitMQ Web MQTT插件可以用來支持將rabbitmq作為MQTT協議的服務器,而websocket支持mqtt協議通信實現消息推送。因為我們目前使用rabbitmq,所以采用其作為ws的服務端(原來有過activemq的做法,其原生也支持MQTT協議)。

首先安裝RabbitMQ Web MQTT插件,如下:

rabbitmq-plugins enable rabbitmq_web_mqtt

MQTT在15675端口下的ws命名空間暴露WebSocket端點。如下:

http://IP:15675/ws

Eclipse旗下的Paho JavaScript客戶端可以使用MQTT協議實現ws通信,其使用如下:

<script src="mqttws31.js"></script>
<script>

    var wsbroker = location.hostname;  // mqtt websocket enabled broker
    var wsport = 15675; // port for above
    client = new Paho.MQTT.Client(wsbroker, wsport, "/ws",
        "myclientid_" + parseInt(Math.random() * 100, 10));
    client.onConnectionLost = function (responseObject) {
        debug("CONNECTION LOST - " + responseObject.errorMessage);
    };
    client.onMessageArrived = function (message) {
        debug("RECEIVE ON " + message.destinationName + " PAYLOAD " + message.payloadString);
        print_first(message.payloadString);
    };
var sessionId = getSessionId();
var options = {
    timeout: 3,
    keepAliveInterval: 30,
    onSuccess: function () {
        debug("CONNECTION SUCCESS");
// 這樣就可以做到點對點通信 client.subscribe(sessionId
, {qos: 1}); }, onFailure: function (message) { debug("CONNECTION FAILURE - " + message.errorMessage); } }; if (location.protocol == "https:") { options.useSSL = true; } debug("CONNECT TO " + wsbroker + ":" + wsport); client.connect(options);

java服務端發送消息給rabbitmq:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
private static void testSendMqtt() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        // factory.setVirtualHost("");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            String sessionId = getSessionId();
            byte[] messageBodyBytes = "{'text':'Hello, world!中文'}".getBytes();
            // 這樣就可以做到點對點通信了,amq.topic是默認exchange
            channel.basicPublish("amq.topic", sessionId, null, messageBodyBytes);
        }finally {
            if (channel != null) {
                channel.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }

 

參考:

https://www.rabbitmq.com/web-mqtt.html

https://www.eclipse.org/paho/clients/js/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM