客戶端(springmvc)調用netty構建的nio服務端,獲得響應后返回頁面(同步響應)


后面考慮通過netty做一個真正意義的簡約版RPC框架,今天先嘗試通過正常調用邏輯調用netty構建的nio服務端並同步獲得返回信息。為后面做鋪墊

服務端實現

我們先完成服務端的邏輯,邏輯很簡單,把客戶端請求的內容加上服務器時間戳一並返回

public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,4096)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new Handler());
                        }
                    });
            System.out.println("服務啟動"+port);
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String data = (String) msg;
        System.out.println("服務端接收數據:" + data);
        data = data + " now:"+System.currentTimeMillis();
        ctx.writeAndFlush(Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
    }

服務端用了LineBasedFrameDecoder,以防止半包讀寫問題,客戶端需要進行配合

客戶端實現

這個案例客戶端實現有兩個難點:

1、客戶端方法如何能請求到SimpleChannelInboundHandler,即把需要發送的消息傳到handler中

2、如何能等待服務端響應同步返回

第一個問題其實是如何把客戶端輸入的參數傳入handler,所以我們需要把參數以構造函數傳參的方式以此傳入ChannelInitializer、SimpleChannelInboundHandler,這樣handler就可以拿到客戶端輸入的數據了

下面的Controller里需要提前把channel准備好,如果RPC框架需要考慮通道與服務的關系

@RestController
public class Controller {
    static String ip = "127.0.0.1";
    static int port = 9876;
    static Bootstrap bootstrap;
    static NioEventLoopGroup worker;
    static {
        bootstrap = new Bootstrap();
        worker = new NioEventLoopGroup();
        bootstrap.group(worker);
        bootstrap.channel(NioSocketChannel .class)
        .option(ChannelOption.TCP_NODELAY, true)
        .remoteAddress(new InetSocketAddress(ip, port));
    }


    @GetMapping("/nio/netty/server")
    public String test(String param){
        CustomerChannelInitializer customerChannelInitializer = new CustomerChannelInitializer(param);
        bootstrap.handler(customerChannelInitializer);
        ChannelFuture channelFuture= null;
        String msg = "";
        try {
            channelFuture = bootstrap.connect().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return customerChannelInitializer.getResponse();
    }
}
public class CustomerChannelInitializer extends ChannelInitializer<SocketChannel> {
    private String response;
    private CountDownLatch countDownLatch;
    private String param;

    private ClientChannelHandlerAdapter clientChannelHandlerAdapter;

    public CustomerChannelInitializer(String param) {
        this.param = param;
    }
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        countDownLatch = new CountDownLatch(1);
        clientChannelHandlerAdapter = new ClientChannelHandlerAdapter(param, this);
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(clientChannelHandlerAdapter);
    }
    public String getResponse() {
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return response;
    }

    public void setResponse(String response) {
        this.response = response;
        countDownLatch.countDown();
    }
}
public class ClientChannelHandlerAdapter extends SimpleChannelInboundHandler<ByteBuf> {
    private String param;
    private CustomerChannelInitializer customerChannelInitializer;

    public ClientChannelHandlerAdapter(String param, CustomerChannelInitializer customerChannelInitializer) {
        this.param = param;
        this.customerChannelInitializer = customerChannelInitializer;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("客戶端收到返回數據:" + msg.toString(CharsetUtil.UTF_8));
        customerChannelInitializer.setResponse(msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端准備發送數據");
        ctx.writeAndFlush(Unpooled.copiedBuffer(param + System.getProperty("line.separator"), CharsetUtil.UTF_8));
        System.out.println("客戶端發送數據完成");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("發生異常");
        cause.printStackTrace();
        ctx.close();
    }
}

我們來看第二個問題,由於netty是異步的,所以無法等待到服務端響應后調用客戶端的channelRead0方法,controller就已經返回了,導致了網頁顯示的返回結果一直是空

主線程通過CountDownLatch來鎖住沒有返回結果的線程,直到工作線程獲得結果並解鎖

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        countDownLatch = new CountDownLatch(1);
    ……

public String getResponse() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return response; } public void setResponse(String response) { this.response = response; countDownLatch.countDown(); }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM