后面考慮通過netty做一個真正意義的簡約版RPC框架,今天先嘗試通過正常調用邏輯調用netty構建的nio服務端並同步獲得返回信息。為后面做鋪墊
服務端實現
我們先完成服務端的邏輯,邏輯很簡單,把客戶端請求的內容加上服務器時間戳一並返回
public void run() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,4096) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new Handler()); } }); System.out.println("服務啟動"+port); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); }finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String data = (String) msg; System.out.println("服務端接收數據:" + data); data = data + " now:"+System.currentTimeMillis(); ctx.writeAndFlush(Unpooled.copiedBuffer(data, CharsetUtil.UTF_8)); }
服務端用了LineBasedFrameDecoder,以防止半包讀寫問題,客戶端需要進行配合
客戶端實現
這個案例客戶端實現有兩個難點:
1、客戶端方法如何能請求到SimpleChannelInboundHandler,即把需要發送的消息傳到handler中
2、如何能等待服務端響應同步返回
第一個問題其實是如何把客戶端輸入的參數傳入handler,所以我們需要把參數以構造函數傳參的方式以此傳入ChannelInitializer、SimpleChannelInboundHandler,這樣handler就可以拿到客戶端輸入的數據了
下面的Controller里需要提前把channel准備好,如果RPC框架需要考慮通道與服務的關系
@RestController public class Controller { static String ip = "127.0.0.1"; static int port = 9876; static Bootstrap bootstrap; static NioEventLoopGroup worker; static { bootstrap = new Bootstrap(); worker = new NioEventLoopGroup(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel .class) .option(ChannelOption.TCP_NODELAY, true) .remoteAddress(new InetSocketAddress(ip, port)); } @GetMapping("/nio/netty/server") public String test(String param){ CustomerChannelInitializer customerChannelInitializer = new CustomerChannelInitializer(param); bootstrap.handler(customerChannelInitializer); ChannelFuture channelFuture= null; String msg = ""; try { channelFuture = bootstrap.connect().sync(); } catch (InterruptedException e) { e.printStackTrace(); } return customerChannelInitializer.getResponse(); } }
public class CustomerChannelInitializer extends ChannelInitializer<SocketChannel> { private String response; private CountDownLatch countDownLatch; private String param; private ClientChannelHandlerAdapter clientChannelHandlerAdapter; public CustomerChannelInitializer(String param) { this.param = param; } @Override protected void initChannel(SocketChannel ch) throws Exception { countDownLatch = new CountDownLatch(1); clientChannelHandlerAdapter = new ClientChannelHandlerAdapter(param, this); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(clientChannelHandlerAdapter); } public String getResponse() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return response; } public void setResponse(String response) { this.response = response; countDownLatch.countDown(); } }
public class ClientChannelHandlerAdapter extends SimpleChannelInboundHandler<ByteBuf> { private String param; private CustomerChannelInitializer customerChannelInitializer; public ClientChannelHandlerAdapter(String param, CustomerChannelInitializer customerChannelInitializer) { this.param = param; this.customerChannelInitializer = customerChannelInitializer; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("客戶端收到返回數據:" + msg.toString(CharsetUtil.UTF_8)); customerChannelInitializer.setResponse(msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端准備發送數據"); ctx.writeAndFlush(Unpooled.copiedBuffer(param + System.getProperty("line.separator"), CharsetUtil.UTF_8)); System.out.println("客戶端發送數據完成"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("發生異常"); cause.printStackTrace(); ctx.close(); } }
我們來看第二個問題,由於netty是異步的,所以無法等待到服務端響應后調用客戶端的channelRead0方法,controller就已經返回了,導致了網頁顯示的返回結果一直是空
主線程通過CountDownLatch來鎖住沒有返回結果的線程,直到工作線程獲得結果並解鎖
@Override protected void initChannel(SocketChannel ch) throws Exception { countDownLatch = new CountDownLatch(1); ……
public String getResponse() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return response; } public void setResponse(String response) { this.response = response; countDownLatch.countDown(); }