netty實現websocket推送消息


前言

由於http協議為應答模式的連接,無法保持長連接於是引入了websocket套接字長連接概念,能夠保持數據持久性的交互;本篇文章將告知讀者如何使用netty實現簡單的消息推送功能

websocket請求頭

GET / HTTP/1.1
Host: 127.0.0.1:8096
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36
Upgrade: websocket
Origin: http://localhost:8056
Sec-WebSocket-Version: 13

websocket請求頭 會有 Connection 升級為 Upgrade, 並且Upgrade 屬性值為 websocket

引入依賴

引入netty和 引擎模板依賴

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.55.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
    </dependencies>

創建WebSocketServer

創建Nio線程組,並在輔助啟動器中中注入 自定義處理器;定義套接字端口為8096;

/**
 * @author lsc
 * <p> </p>
 */
@Slf4j
public class WebSocketServer {

    public void init(){
        NioEventLoopGroup boss=new NioEventLoopGroup();
        NioEventLoopGroup work=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(boss,work);
            bootstrap.channel(NioServerSocketChannel.class);
            // 自定義處理器
            bootstrap.childHandler(new SocketChannelInitializer());
            Channel channel = bootstrap.bind(8096).sync().channel();
            log.info("------------webSocket服務器啟動成功-----------:"+channel);
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.info("---------運行出錯----------:"+e);
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
            log.info("------------websocket服務器已關閉----------------");
        }
    }
}

SocketChannelInitializer

SocketChannelInitializer 中定義了聚合器 HttpObjectAggregator 將多個http片段消息聚合成完整的http消息,並且指定大小為65536;最后注入自定義的WebSocketHandler;

/**
 * @author lsc
 * <p> </p>
 */
public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        //設置log監聽器
        ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));
        //設置解碼器
        ch.pipeline().addLast("http-codec",new HttpServerCodec());
        //聚合器
        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
        //用於大數據的分區傳輸
        ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
        //自定義業務handler
        ch.pipeline().addLast("handler",new WebSocketHandler());
    }
}

WebSocketHandler

WebSocketHandler 中對接收的消息進行判定,如果是websocket 消息 則將消息廣播給所有通道;

/**
 * @author lsc
 * <p> </p>
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object>  {

    // 存放已經連接的通道
    private  static ConcurrentMap<String, Channel> ChannelMap=new ConcurrentHashMap();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest){

            System.out.println("------------收到http消息--------------"+msg);
            handleHttpRequest(ctx,(FullHttpRequest)msg);
        }else if (msg instanceof WebSocketFrame){
            //處理websocket客戶端的消息
            String message = ((TextWebSocketFrame) msg).text();
            System.out.println("------------收到消息--------------"+message);
//            ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
            // 將消息回復給所有連接
            Collection<Channel> values = ChannelMap.values();
            for (Channel channel: values){
                channel.writeAndFlush(new TextWebSocketFrame(message));
            }
        }

    }

    /**
     * @author lsc
     * <p> 處理http請求升級</p>
     */
    private void handleHttpRequest(ChannelHandlerContext ctx,
                                   FullHttpRequest req) throws Exception {

        // 該請求是不是websocket upgrade請求
        if (isWebSocketUpgrade(req)) {
            String ws = "ws://127.0.0.1:8096";
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(ws, null, false);
            WebSocketServerHandshaker handshaker = factory.newHandshaker(req);

            if (handshaker == null) {// 請求頭不合法, 導致handshaker沒創建成功
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                // 響應該請求
                handshaker.handshake(ctx.channel(), req);
            }
            return;
        }
    }

    //n1.GET? 2.Upgrade頭 包含websocket字符串?
    private boolean isWebSocketUpgrade(FullHttpRequest req) {
        HttpHeaders headers = req.headers();
        return req.method().equals(HttpMethod.GET)
                && headers.get(HttpHeaderNames.UPGRADE).equals("websocket");
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //添加連接
        log.debug("客戶端加入連接:"+ctx.channel());
        Channel channel = ctx.channel();
        ChannelMap.put(channel.id().asShortText(),channel);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //斷開連接
        log.debug("客戶端斷開連接:"+ctx.channel());
        Channel channel = ctx.channel();
        ChannelMap.remove(channel.id().asShortText());
    }
}

最后將WebSocketServer 注入spring監聽器,在服務啟動的時候運行;

@Slf4j
@Component
public class ApplicationConfig implements ApplicationListener<ApplicationReadyEvent> {



    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        WebSocketServer webSocketServer = new WebSocketServer();
        webSocketServer.init();
    }
}

視圖轉發

編寫 IndexController 對視圖進行轉發

/**
 * @author lsc
 * <p> </p>
 */
@Controller
public class IndexController {

    @GetMapping("index")
    public ModelAndView index(){
        ModelAndView modelAndView = new ModelAndView("socket");
        return modelAndView;
    }
}

html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>用戶頁面</title>
</head>
<body>
<div id="msg"></div>
<input type="text" id="text">
<input type="submit" value="send" onclick="send()">
</body>
<script>
    var msg = document.getElementById("msg");
    var wsServer = 'ws://127.0.0.1:8096';
    var websocket = new WebSocket(wsServer);
    //監聽連接打開
    websocket.onopen = function (evt) {
        msg.innerHTML = "The connection is open";
    };

    //監聽服務器數據推送
    websocket.onmessage = function (evt) {
        msg.innerHTML += "<br>" + evt.data;
    };

    //監聽連接關閉
    websocket.onclose = function (evt) {
        alert("連接關閉");
    };

    function send() {
        var text = document.getElementById("text").value
        websocket.send(text);
    }
</script>
</html>

附上配置文件

server:
  port: 8056

spring:
  # 引擎模板配置
  thymeleaf:
    cache: false # 關閉緩存
    mode: html # 去除htm5嚴格校驗
    prefix: classpath:/templates/ # 指定 thymeleaf 模板路徑
    encoding: UTF-8 # 指定字符集編碼
    suffix: .html

運行服務后

前端頁面顯示消息

服務端打印消息

源碼獲取:知識追尋者公眾號回復:netty

配套教程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM