一款基於Netty開發的WebSocket服務器
這是一款基於Netty框架開發的服務端,通信協議為WebSocket。主要用於Java后台服務器向瀏覽器進行消息推送。
需求
對於一個Web項目而言,客戶端一般均為各種各樣的瀏覽器,如何從后端服務器向瀏覽器客戶端進行消息推送,便成了一個棘手的問題,好在在HTTP1.1之后,HTTP可以支持長連接,由此,我在Netty框架的基礎上開發了這個WebSocket服務端。
當然,你依舊可以下載源碼進行測試,集成,二次開發等等。
環境
- Intellij IDEA2018
- JDK 1.8
- 插件:Simple WebSocket Client 0.1.3
- FireFox Quantum 60.0.1 (64 位)
- Google Chrome 67.0.3396.99(正式版本)(64 位)
運行結果
請看下圖:

實現步驟及源碼
WebSocket
WebSocket是HTML5開始提供的一種瀏覽器與服務器間進行全雙工通訊的網絡技術。 WebSocket通信協議於2011年被IETF定為標准RFC 6455,WebSocketAPI被W3C定為標准。 在WebSocket API中,瀏覽器和服務器只需要要做一個握手的動作,然后,瀏覽器和服務器之間就形成了一條快速通道。兩者之間就直接可以數據互相傳送。
目錄結構

核心類講解
- WebSocketChildHandler
- WebSocketServerHandler
WebSocketChildHandler
這個類的主要作用就是為Netty的通道注冊事件。其核心代碼見下,其中webSocketUrl就是客戶端與服務端進行連接的請求路徑,我將其寫入了配置文件,交由Spring管理,以注入的方式傳遞到WebSocketServerHandler中。
ChannelPipeline pipeline = socketChannel.pipeline();
// 將請求與應答消息編碼或者解碼為HTTP消息
pipeline.addLast("http-codec", new HttpServerCodec());
// 將http消息的多個部分組合成一條完整的HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(HttpObjectConstant.MAX_CONTENT_LENGTH));
// 向客戶端發送HTML5文件。主要用於支持瀏覽器和服務端進行WebSocket通信
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
// 服務端Handler
pipeline.addLast("handler", new WebSocketServerHandler(webSocketUrl));
WebSocketServerHandler
這個類是真正的核心類,這個類的主要功能為:
- 進行第一次握手
- 對消息進行處理
- 可以實現點對點通信
- 可以實現廣播功能
- 可以實現點對端通信
/**
* 接收客戶端發送的消息
*
* @param channelHandlerContext ChannelHandlerContext
* @param receiveMessage 消息
*/
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object receiveMessage) throws Exception {
// 傳統http接入 第一次需要使用http建立握手
if (receiveMessage instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) receiveMessage;
LOGGER.info("├ [握手]: {}", fullHttpRequest.uri());
// 握手
handlerHttpRequest(channelHandlerContext, fullHttpRequest);
// 發送連接成功給客戶端
channelHandlerContext.channel().write(new TextWebSocketFrame("連接成功"));
}
// WebSocket接入
else if (receiveMessage instanceof WebSocketFrame) {
WebSocketFrame webSocketFrame = (WebSocketFrame) receiveMessage;
handlerWebSocketFrame(channelHandlerContext, webSocketFrame);
}
}
/**
* 第一次握手
*
* @param channelHandlerContext channelHandlerContext
* @param req 請求
*/
private void handlerHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest req) {
// 構造握手響應返回,本機測試
WebSocketServerHandshakerFactory wsFactory
= new WebSocketServerHandshakerFactory(webSocketUrl, Constant.NULL, Constant.FALSE);
// region 從連接路徑中截取連接用戶名
String uri = req.uri();
int i = uri.lastIndexOf("/");
String userName = uri.substring(i + 1, uri.length());
// endregion
Channel connectChannel = channelHandlerContext.channel();
// 加入在線用戶
WebSocketUsers.put(userName, connectChannel);
socketServerHandShaker = wsFactory.newHandshaker(req);
if (socketServerHandShaker == null) {
// 發送版本錯誤
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(connectChannel);
} else {
// 握手響應
socketServerHandShaker.handshake(connectChannel, req);
}
}
/**
* webSocket處理邏輯
*
* @param channelHandlerContext channelHandlerContext
* @param frame webSocketFrame
*/
private void handlerWebSocketFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame) throws IOException {
Channel channel = channelHandlerContext.channel();
// region 判斷是否是關閉鏈路的指令
if (frame instanceof CloseWebSocketFrame) {
LOGGER.info("├ 關閉與客戶端[{}]鏈接", channel.remoteAddress());
socketServerHandShaker.close(channel, (CloseWebSocketFrame) frame.retain());
return;
}
// endregion
// region 判斷是否是ping消息
if (frame instanceof PingWebSocketFrame) {
LOGGER.info("├ [Ping消息]");
channel.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// endregion
// region 純文本消息
if (frame instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) frame).text();
LOGGER.info("├ [{} 接收到客戶端的消息]: {}", new Date(), text);
channel.write(new TextWebSocketFrame(new Date() + " 服務器將你發的消息原樣返回:" + text));
}
// endregion
// region 二進制消息 此處使用了MessagePack編解碼方式
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) frame;
ByteBuf content = binaryWebSocketFrame.content();
LOGGER.info("├ [二進制數據]:{}", content);
final int length = content.readableBytes();
final byte[] array = new byte[length];
content.getBytes(content.readerIndex(), array, 0, length);
MessagePack messagePack = new MessagePack();
WebSocketMessageEntity webSocketMessageEntity = messagePack.read(array, WebSocketMessageEntity.class);
LOGGER.info("├ [解碼數據]: {}", webSocketMessageEntity);
WebSocketUsers.sendMessageToUser(webSocketMessageEntity.getAcceptName(), webSocketMessageEntity.getContent());
}
// endregion
}
至此,服務端算是開發完成。但可以看出,服務端中仍有很大的發展空間,細心的同學可以發現我在第一次握手時,將Channel存儲了起來,對於上述的三種情況也有簡易的實現方案。
如果有必要,我也會將非瀏覽器客戶端代碼(非Js客戶端)寫成例子,共享出來。
一款基於Netty開發的WebSocket服務器
注:本文著作權歸作者,由demo大師代發,拒絕轉載,轉載需要作者授權
