前言
由於http協議為應答模式的連接,無法保持長連接於是引入了websocket套接字長連接概念,能夠保持數據持久性的交互;本篇文章將告知讀者如何使用netty實現簡單的消息推送功能
websocket請求頭
GET / HTTP/1.1
Host: 127.0.0.1:8096
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36
Upgrade: websocket
Origin: http://localhost:8056
Sec-WebSocket-Version: 13
websocket請求頭 會有 Connection 升級為 Upgrade, 並且Upgrade 屬性值為 websocket
引入依賴
引入netty和 引擎模板依賴
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.55.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
</dependencies>
創建WebSocketServer
創建Nio線程組,並在輔助啟動器中中注入 自定義處理器;定義套接字端口為8096;
/**
* @author lsc
* <p> </p>
*/
@Slf4j
public class WebSocketServer {
public void init(){
NioEventLoopGroup boss=new NioEventLoopGroup();
NioEventLoopGroup work=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,work);
bootstrap.channel(NioServerSocketChannel.class);
// 自定義處理器
bootstrap.childHandler(new SocketChannelInitializer());
Channel channel = bootstrap.bind(8096).sync().channel();
log.info("------------webSocket服務器啟動成功-----------:"+channel);
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
log.info("---------運行出錯----------:"+e);
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
log.info("------------websocket服務器已關閉----------------");
}
}
}
SocketChannelInitializer
SocketChannelInitializer 中定義了聚合器 HttpObjectAggregator 將多個http片段消息聚合成完整的http消息,並且指定大小為65536;最后注入自定義的WebSocketHandler;
/**
* @author lsc
* <p> </p>
*/
public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
//設置log監聽器
ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));
//設置解碼器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//聚合器
ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
//用於大數據的分區傳輸
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//自定義業務handler
ch.pipeline().addLast("handler",new WebSocketHandler());
}
}
WebSocketHandler
WebSocketHandler 中對接收的消息進行判定,如果是websocket 消息 則將消息廣播給所有通道;
/**
* @author lsc
* <p> </p>
*/
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
// 存放已經連接的通道
private static ConcurrentMap<String, Channel> ChannelMap=new ConcurrentHashMap();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest){
System.out.println("------------收到http消息--------------"+msg);
handleHttpRequest(ctx,(FullHttpRequest)msg);
}else if (msg instanceof WebSocketFrame){
//處理websocket客戶端的消息
String message = ((TextWebSocketFrame) msg).text();
System.out.println("------------收到消息--------------"+message);
// ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
// 將消息回復給所有連接
Collection<Channel> values = ChannelMap.values();
for (Channel channel: values){
channel.writeAndFlush(new TextWebSocketFrame(message));
}
}
}
/**
* @author lsc
* <p> 處理http請求升級</p>
*/
private void handleHttpRequest(ChannelHandlerContext ctx,
FullHttpRequest req) throws Exception {
// 該請求是不是websocket upgrade請求
if (isWebSocketUpgrade(req)) {
String ws = "ws://127.0.0.1:8096";
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(ws, null, false);
WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
if (handshaker == null) {// 請求頭不合法, 導致handshaker沒創建成功
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// 響應該請求
handshaker.handshake(ctx.channel(), req);
}
return;
}
}
//n1.GET? 2.Upgrade頭 包含websocket字符串?
private boolean isWebSocketUpgrade(FullHttpRequest req) {
HttpHeaders headers = req.headers();
return req.method().equals(HttpMethod.GET)
&& headers.get(HttpHeaderNames.UPGRADE).equals("websocket");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//添加連接
log.debug("客戶端加入連接:"+ctx.channel());
Channel channel = ctx.channel();
ChannelMap.put(channel.id().asShortText(),channel);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//斷開連接
log.debug("客戶端斷開連接:"+ctx.channel());
Channel channel = ctx.channel();
ChannelMap.remove(channel.id().asShortText());
}
}
最后將WebSocketServer 注入spring監聽器,在服務啟動的時候運行;
@Slf4j
@Component
public class ApplicationConfig implements ApplicationListener<ApplicationReadyEvent> {
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
WebSocketServer webSocketServer = new WebSocketServer();
webSocketServer.init();
}
}
視圖轉發
編寫 IndexController 對視圖進行轉發
/**
* @author lsc
* <p> </p>
*/
@Controller
public class IndexController {
@GetMapping("index")
public ModelAndView index(){
ModelAndView modelAndView = new ModelAndView("socket");
return modelAndView;
}
}
html
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>用戶頁面</title>
</head>
<body>
<div id="msg"></div>
<input type="text" id="text">
<input type="submit" value="send" onclick="send()">
</body>
<script>
var msg = document.getElementById("msg");
var wsServer = 'ws://127.0.0.1:8096';
var websocket = new WebSocket(wsServer);
//監聽連接打開
websocket.onopen = function (evt) {
msg.innerHTML = "The connection is open";
};
//監聽服務器數據推送
websocket.onmessage = function (evt) {
msg.innerHTML += "<br>" + evt.data;
};
//監聽連接關閉
websocket.onclose = function (evt) {
alert("連接關閉");
};
function send() {
var text = document.getElementById("text").value
websocket.send(text);
}
</script>
</html>
附上配置文件
server:
port: 8056
spring:
# 引擎模板配置
thymeleaf:
cache: false # 關閉緩存
mode: html # 去除htm5嚴格校驗
prefix: classpath:/templates/ # 指定 thymeleaf 模板路徑
encoding: UTF-8 # 指定字符集編碼
suffix: .html
運行服務后
前端頁面顯示消息

服務端打印消息

源碼獲取:知識追尋者公眾號回復:netty
配套教程
