場景
由於Http協議是無狀態的,每一次請求只能響應一次,下次請求需要重新連接。
如果客戶端請求一個服務端資源,需要實時監服務端執行狀態(比如導出大數據量時需要前端監控導出狀態),這個時候不斷請求連接浪費資源。可以通過WebSocket
建立一個長連接,實現客戶端與服務端雙向交流。
使用Netty
實現瀏覽器與服務端建立WebSocket
連接,互相監控狀態,客戶端發送消息服務端回寫。
服務端狀態及消息發送及回顯:
服務端讀取瀏覽器消息並監控頁面狀態
實現
服務端
服務端需要添加多個Netty
框架的Handler
,其中使用WebSocketServerProtocolHandler("/hello")
將http協議升級為WebSocket
協議,升級指定的uri需要與瀏覽器請求地址保持一致。
package others.netty.webSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* Netty實現WebSocket實例
* Http連接是無狀態的,且每一個請求只會響應一次,下次需要重新連接。服務端不可主動向客戶端發送消息
* 使用WebSocket實現一個客戶端與服務端可互相通信的償連接
*
* @author makeDoBetter
* @version 1.0
* @date 2021/4/29 15:54
* @since JDK 1.8
*/
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//基於http協議,因此添加http編解碼器
pipeline.addLast(new HttpServerCodec());
//提供大文件寫的處理器,尤其適用於大文件寫,方便管理狀態,不需要用戶過分關心
//一個{@link ChannelHandler},它增加了對寫入大型數據流的支持既不花費大量內存
// 也不獲取{@link OutOfMemoryError}。
// 大型數據流(例如文件傳輸)需要在{@link ChannelHandler}實現中進行復雜的狀態管理。
// {@link ChunkedWriteHandler}管理如此復雜的狀態以便您可以毫無困難地發送大量數據流。
pipeline.addLast(new ChunkedWriteHandler());
//將http協議下分段傳輸的數據聚合到一起,用於響應請求是需要添加在 HttpServerCodec()之后
pipeline.addLast(new HttpObjectAggregator(8192));
//將服務器協議升級為WebSocket協議保持長連接 處理握手及幀的傳遞
//升級協議是通過修改狀態碼實現的 200升級為101
//WebSocket 長連接消息傳遞是通過幀的形式進行傳遞的
//幀 繼承抽象類 WebSocketFrame 有六個子類 幀的處理由管道中下一個handler進行處理
//WebSocket請求形式 :ws://localhost:1234/hello
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new FrameHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(1234).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("服務端啟動");
} else {
System.out.println("服務端啟動失敗");
}
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
自定義處理器
自定義處理器實現瀏覽器消息的接收及回寫,以及實現其他連接及離線事件的監控。
package others.netty.webSocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* WebSocket 長連接下 文本幀的處理器
* 實現瀏覽器發送文本回寫
* 瀏覽器連接狀態監控
*
* @author makeDoBetter
* @version 1.0
* @date 2021/4/29 16:30
* @since JDK 1.8
*/
public class FrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//使用msg.text()獲得幀中文本
System.out.println(msg.text());
//回寫,需要封裝成TextWebSocketFrame 對象寫入到通道中
ctx.channel().writeAndFlush(new TextWebSocketFrame("【服務端】" + LocalDateTime.now() + msg.text()));
}
/**
* 出現異常的處理 打印報錯日志
*
* @param ctx the ctx
* @param cause the cause
* @throws Exception the Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
//關閉上下文
ctx.close();
}
/**
* 監控瀏覽器上線
*
* @param ctx the ctx
* @throws Exception the Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id().asShortText() + "連接");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id().asShortText() + "斷開連接");
}
}
客戶端
客戶端思路:
- 如果瀏覽器支持
WebSocket
,創建一個WebSocket
連接,后使用socket
變量監控各個事件,並給出相應事件的發生; - 發送消息時先校驗是否允許
WebSocket
,並在socket.readyState == WebSocket.OPEN
狀態下進行消息發送。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:1234/hello");
//消息獲得事件
socket.onmessage = function (ev) {
alert("收到消息");
var id = document.getElementById('getMessage');
id.value = id.value + '\n' + ev.data;
};
socket.onopen = function (ev) {
var id = document.getElementById('getMessage');
id.value = '連接服務器成功';
};
socket.onclose = function (ev) {
var id = document.getElementById('getMessage');
id.value = id.value + '\n' + '服務器連接關閉';
}
} else {
alert("瀏覽器不支持WebSocket");
}
function send(message) {
if (!window.socket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
}
}
</script>
<form onsubmit="return false">
<textarea name="textarea1" type="text" style="width: 300px; height: 400px"></textarea>
<input type="button" onclick="send(this.form.textarea1.value)" value="發送">
<textarea id="getMessage" type="text" style="width: 300px; height: 400px"></textarea>
<input type="button" onclick="document.getElementById('getMessage').value=''" value="清空">
</form>
</body>
</html>