7.MQTT網頁客戶端連接MQTT服務器的問題WebSocket connection to 'ws://XXX:1883/' failed: Connection closed before receiving a handshake response


問題描述:
MQTT.js提供了連接MQTT的一套javascipt的方法,可供前端連接到MQTT服務器,也可以作為腳本測試。
以腳本形式,用nodejs運行,是沒有問題的,能夠正常連接並且發送報文。
但是如果把js代碼放到HTML文件中,就不能正常完成連接,提示:
客戶端提示:

 

服務器提示:

 

問題解決;
根據客戶端提示,是無法完成握手連接,根據服務器提示,是因為解包的時候,包不符合格式,導致了連接拒絕。
通過查閱文獻,發現普通的socket和websocket是不一樣的。
直接運行腳本使用的是socket.io 而 瀏覽器使用的是websocket,而處理這兩種報文是不一樣的。
所以猜測是MQTT並沒有打開websocket支持,而是把過來的包當做普通的socket包處理了。
查閱代碼發現:
我服務器使用的是moquette,里面的nettyAcceptor已經明確說明了TCP和websocket的不同處理,可以看到代碼中websocket是關閉的:

private void initializePlainTCPTransport(final NettyMQTTHandler handler, IConfig props) throws IOException {
final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
if (DISABLED_PORT_BIND.equals(tcpPortProp)) {
LOG.info("tcp MQTT is disabled because the value for the property with key {}", BrokerConstants.PORT_PROPERTY_NAME);
return;
}
int port = Integer.parseInt(tcpPortProp);
initFactory(host, port, new PipelineInitializer() {
@Override
void init(ChannelPipeline pipeline) {
pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
pipeline.addLast("decoder", new MQTTDecoder());
pipeline.addLast("encoder", new MQTTEncoder());
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
// pipeline.addLast("messageLogger", new MQTTMessageLogger());
pipeline.addLast("handler", handler);
}
});
}

private void initializeWebSocketTransport(final NettyMQTTHandler handler, IConfig props) throws IOException {
String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
if (DISABLED_PORT_BIND.equals(webSocketPortProp)) {
//Do nothing no WebSocket configured
LOG.info("WebSocket is disabled");
return;
}
else{
LOG.info("WebSocket is enable");
}
int port = Integer.valueOf(webSocketPortProp);

final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();

String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
initFactory(host, port, new PipelineInitializer() {
@Override
void init(ChannelPipeline pipeline) {
pipeline.addLast("httpEncoder", new HttpResponseEncoder());
pipeline.addLast("httpDecoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
pipeline.addLast("decoder", new MQTTDecoder());
pipeline.addLast("encoder", new MQTTEncoder());
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
pipeline.addLast("handler", handler);
}
});
}

 

解決方法:
設置一個端口,我設為1885,將websocket服務打開就可以了。

String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME);
        if (DISABLED_PORT_BIND.equals(webSocketPortProp)) {
            //Do nothing no WebSocket configured
            LOG.info("WebSocket is disabled");
            return;
        }
        else{
            LOG.info("WebSocket is enable");
        }
        int port = Integer.valueOf(webSocketPortProp);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM