netty同時實現http與socket


(1)啟動類

package test;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * netty服務器啟動類
 * @author songyan
 * 
 */
public class HttpProxyServer {

    public static void main(String[] args) throws Exception {
        int LOCAL_PORT = (args.length > 0) ? Integer.parseInt(args[0]) : 5688;// 代理的端口號
        System.out.println("Proxying on port " + LOCAL_PORT);

        // 主從線程組模型
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            // 創建核心類
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

                    // 添加助手類
                    .childHandler(new ServerInitialzer()).bind(LOCAL_PORT).sync().channel().closeFuture().sync();

        } finally {

            // 關閉主從線程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

(2)初始化類

package test;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 
 * @author songyan
 * 通用的初始化類
 */
public class ServerInitialzer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        
        //netty是基於http的,所以要添加http編碼器
        pipeline.addLast(new HttpServerCodec());
        //對寫大數據流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //設置單次請求的文件大小上限
        pipeline.addLast(new HttpObjectAggregator(1024*1024*10));
        //websocket 服務器處理的協議,用於指定給客戶端連接訪問的路由 : /ws
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //自定義的路由
        pipeline.addLast(new HttpHandler());
        
    }

}

注:

new WebSocketServerProtocolHandler("/ws")只能攔截uri為ws://127.0.0.1:5688/ws的請求

比如我想匹配請求:ws://192.168.11.3:5688/gxgd/echo?fromUser=301208

則應該new WebSocketServerProtocolHandler("/gxgd/echo?fromUser=301208")

顯然這樣寫是不合理的,我們的參數是不確定的,是動態的,但是如果這樣寫的話,是完全匹配,一點不一樣就會報404。

看他的構造函數發現有

 public WebSocketServerProtocolHandler(String websocketPath, boolean checkStartsWith) {
        this(websocketPath, null, false, 65536, false, checkStartsWith);
    }

第一個是參數是路徑,第二個參數是是否startwith,也就是第二個參數設置成true就可以:只要請求是以第一個參數開頭的就可以了

例如:

new WebSocketServerProtocolHandler("/gxgd/echo",true)

可以匹配

ws://192.168.11.3:5688/gxgd/echo?fromUser=301208

這樣就不會出現更改參數報404的錯誤了

 

(3)自定義路由

package test;

import java.time.LocalDateTime;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 自定義的路由 既可以實現http又可以實現socket
 * 
 * @author songyan
 *
 */
public class HttpHandler extends SimpleChannelInboundHandler<Object> {
    // 用於記錄和管理所有客戶端的channle
    private Channel outboundChannel;
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 打開鏈接
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: active");
        super.channelActive(ctx);
    }

    /**
     * 獲取客戶端的channle,添加到ChannelGroup中
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: add");
        clients.add(ctx.channel());
    }

    /**
     * 從ChannelGroup中移除channel
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: Removed");
    }

    /**
     * 銷毀channel
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: destroyed");
        if (clients != null) {
            closeOnFlush(outboundChannel);
        }
    }

    /**
     * 關閉釋放channel
     * @param ch
     */
    static void closeOnFlush(Channel ch) {
        if (ch != null && ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    
    /**
     * 異常捕獲
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.println("出錯了");
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 路由
     * 對http,websocket單獨處理
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {// 如果是HTTP請求,進行HTTP操作
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {// 如果是Websocket請求,則進行websocket操作
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
    
    /**
     * 對http請求的處理
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, final FullHttpRequest msg) {
        final Channel inboundChannel = ctx.channel();
        String host = msg.headers().get("Host");
        int port = 80;

        String pattern = "(http://|https://)?([^:]+)(:[\\d]+)?";
        Pattern r = Pattern.compile(pattern);
        Matcher m = r.matcher(host);
        if (m.find()) {
            host = m.group(2);
            port = (m.group(3) == null) ? 80 : Integer.parseInt(m.group(3).substring(1));
        }

        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop()) // use inboundChannel thread
                .channel(ctx.channel().getClass()).handler(new BackendHandlerInitializer(inboundChannel));

        ChannelFuture f = b.connect("127.0.0.1", 8015);
        outboundChannel = f.channel();
        msg.retain();
        ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    outboundChannel.writeAndFlush(msg);
                } else {
                    inboundChannel.close();
                }
            }
        });
    }

    /**
     * 對socket請求的處理
     */
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
        // 獲取客戶端傳輸過來的消息
        String content = msg.toString();
        System.out.println("websocket:::  接受到的數據:" + content);
        clients.writeAndFlush(new TextWebSocketFrame("[服務器在]" + LocalDateTime.now() + "接受到消息, 消息為:" + content));

    }

}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM