一、編碼器、解碼器
... ...
@Autowired
private HttpRequestHandler httpRequestHandler;
@Autowired
private TextWebSocketFrameHandler textWebSocketFrameHandler;
... ...
.childHandler(new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// WebSocket 是基於 Http 協議的,要使用 Http 解編碼器
channel.pipeline().addLast("http-codec", new HttpServerCodec());
// 用於大數據流的分區傳輸
channel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
// 將多個消息轉換為單一的 request 或者 response 對象,最終得到的是 FullHttpRequest 對象
channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
// 創建 WebSocket 之前會有唯一一次 Http 請求 (Header 中包含 Upgrade 並且值為 websocket)
channel.pipeline().addLast("http-request",httpRequestHandler);
// 處理所有委托管理的 WebSocket 幀類型以及握手本身
// 入參是 ws://server:port/context_path 中的 contex_path
channel.pipeline().addLast("websocket-server", new WebSocketServerProtocolHandler(socketUri));
// WebSocket RFC 定義了 6 種幀,TextWebSocketFrame 是我們唯一真正需要處理的幀類型
channel.pipeline().addLast("text-frame",textWebSocketFrameHandler);
}
});
... ...
其中 HttpRequestHandler 和 TextWebSocketFrameHandler 是自定義 Handler
1.1 HttpRequestHandler
@Component
@ChannelHandler.Sharable
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestHandler.class);
@Value("${server.socket-uri}")
private String socketUri;
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
if (msg.uri().startsWith(socketUri)) {
String userId = UriUtil.getParam(msg.uri(), "userId");
if (userId != null) {
// todo: 用戶校驗,重復登錄判斷
ChannelSupervise.addChannel(userId, ctx.channel());
ctx.fireChannelRead(msg.setUri(socketUri).retain());
} else {
ctx.close();
}
} else {
ctx.close();
}
}
}
1.2 TextWebSocketFrameHandler
@Component
@ChannelHandler.Sharable
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger LOGGER = LoggerFactory.getLogger(TextWebSocketFrameHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
ctx.pipeline().remove(HttpRequestHandler.class);
}
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String requestMsg = msg.text();
String responseMsg = "服務端接收客戶端消息:" + requestMsg;
TextWebSocketFrame resp = new TextWebSocketFrame(responseMsg);
ctx.writeAndFlush(resp.retain());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
LOGGER.error(ctx.channel().id().asShortText(), cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
ChannelSupervise.removeChannel(ctx.channel());
LOGGER.info("[%s]斷開連接", ctx.channel().id().asShortText());
}
}
二、主動向客戶端推送消息
2.1 推送工具類
public class ChannelSupervise {
private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ConcurrentMap<String, ChannelId> UserChannelMap = new ConcurrentHashMap();
private static ConcurrentMap<String, String> ChannelUserMap = new ConcurrentHashMap();
public static void addChannel(String userId, Channel channel){
GlobalGroup.add(channel);
UserChannelMap.put(userId, channel.id());
ChannelUserMap.put(channel.id().asShortText(), userId);
}
public static void removeChannel(Channel channel){
GlobalGroup.remove(channel);
String userId = ChannelUserMap.get(channel.id().asShortText());
UserChannelMap.remove(userId);
ChannelUserMap.remove(channel.id().asShortText());
}
public static void sendToUser(String userId, String msg){
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(msg);
Channel channel = GlobalGroup.find(UserChannelMap.get(userId));
channel.writeAndFlush(textWebSocketFrame);
}
public static void sendToAll(String msg){
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(msg);
GlobalGroup.writeAndFlush(textWebSocketFrame);
}
}
支持向具體某個客戶端發送消息,或者群發消息
2.2 推送接口
@RestController
public class WebsocketController {
@RequestMapping("sendToAll")
public void sendToAll(String msg) {
ChannelSupervise.sendToAll(msg);
}
@RequestMapping("sendToUser")
public void sendToUser(String userId, String msg) {
ChannelSupervise.sendToUser(userId, msg);
}
}
三、測試
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket客戶端</title>
</head>
<body>
<script type="text/javascript">
var socket;
function connect(){
var userId = document.getElementById('userId').value;
if(window.WebSocket){
// 參數就是與服務器連接的地址
// socket = new WebSocket('ws://localhost:8081/ws');
socket = new WebSocket('ws://localhost:8081/ws?userId=' + userId);
// 客戶端收到服務器消息的時候就會執行這個回調方法
socket.onmessage = function (event) {
var response = document.getElementById('response');
response.innerHTML = response.innerHTML
+ '<p style="color:LimeGreen;"> 接收:' + event.data + '</p>';
}
// 連接建立的回調函數
socket.onopen = function(event){
var status = document.getElementById('status');
status.innerHTML = '<p style="color:YellowGreen;">WebSocket 連接開啟</p>';
}
// 連接斷掉的回調函數
socket.onclose = function (event) {
var status = document.getElementById('status');
status.innerHTML = '<p style="color:Red;">WebSocket 連接關閉</p>';
}
}else{
var status = document.getElementById('status');
status.innerHTML = '<p style="color:Red;">瀏覽器不支持 WebSocket</p>';
}
}
// 發送數據
function send(message){
if(!window.WebSocket){
return;
}
var ta = document.getElementById('response');
ta.innerHTML = ta.innerHTML + '<p style="color:SkyBlue;"> 發送:' + message + '</p>';
// 當websocket狀態打開
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
var response = document.getElementById("response");
response.innerHTML = '<p style="color:Red;">連接沒有開啟</p>';
}
}
</script>
<form onsubmit="return false">
<label for="userId">用戶ID:</label>
<input type="text" name="userId" id="userId" />
<input type ="button" value="連接服務器" onclick="connect();">
</form>
<div id ="status"></div>
<form onsubmit="return false">
<input name = "message" style="width: 200px;"></input>
<input type ="button" value="發送消息" onclick="send(this.form.message.value);">
</form>
<div id ="response"></div>
<input type="button" onclick="javascript:document.getElementById('response').innerHTML=''" value="清空消息">
</body>
</html>
注意
因為自定義 Handler 使用依賴注入實例化,所以需要添加 @ChannelHandler.Sharable 注解,否則會報錯:is not a @Sharable handler, so can’t be added or removed multiple times.
參考
完整代碼:GitHub