精通並發與 Netty (一)如何使用


精通並發與 Netty

Netty 是一個異步的,事件驅動的網絡通信框架,用於高性能的基於協議的客戶端和服務端的開發。

異步指的是會立即返回,並不知道到底發送過去沒有,成功沒有,一般都會使用監聽器來監聽返回。

事件驅動是指開發者只需要關注事件對應的回調方法即可,比如 channel active,inactive,read 等等。

網絡通信框架就不用解釋了,很多你非常熟悉的組件都使用了 netty,比如 spark,dubbo 等等。

初步了解 Netty

第一個簡單的例子,使用 Netty 實現一個 http 服務器,客戶端調用一個沒有參數的方法,服務端返回一個 hello world。

Netty 里面大量的代碼都是對線程的處理和 IO 的異步的操作。

package com.paul;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Server {

    public static void main(String[] args) throws InterruptedException {
        //定義兩個線程組,事件循環組,可以類比與 Tomcat 就是死循環,不斷接收客戶端的連接
        // boss 線程組不斷從客戶端接受連接,但不處理,由 worker 線程組對連接進行真正的處理
        // 一個線程組其實也能完成,推薦使用兩個
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服務端啟動器,可以輕松的啟動服務端的 channel
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //group 方法有兩個,一個接收一個參數,另一個接收兩個參數
            // childhandler 是我們自己寫的請求處理器
            serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class)
                    .childHandler(new ServerInitializer());
            //綁定端口
            ChannelFuture future = serverBootstrap.bind(8011).sync();
            //channel 關閉的監聽
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

package com.paul;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面可以有很多 handler,一層層過濾的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        //HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的組合,編碼和解碼的 h      handler
        pipeline.addLast("httpServerCodec", new HttpServerCodec());
        pipeline.addLast("handler", new ServerHandler());
    }
}

package com.paul;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        if(httpObject instanceof HttpRequest) {
            ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            //單純的調用 write 只會放到緩存區,不會真的發送
            channelHandlerContext.writeAndFlush(response);
        }
    }
}

我們在 SimpleChannelInboundHandler 里分析一下,先看它繼承的 ChannelInboundHandlerAdapter 里面的事件回調方法,包括通道注冊,解除注冊,Active,InActive等等。

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelRegistered();
}

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelUnregistered();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelActive();
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelInactive();
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  ctx.fireChannelRead(msg);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelReadComplete();
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  ctx.fireUserEventTriggered(evt);
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  ctx.fireChannelWritabilityChanged();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  ctx.fireExceptionCaught(cause);
}

執行順序為 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。

Netty 本身並不是遵循 servlet 規范的。Http 是基於請求和響應的無狀態協議。Http 1.1 是有 keep-alived 參數的,如果3秒沒有返回,則服務端主動關閉了解,Http 1.0 則是請求完成直接返回。

Netty 的連接會被一直保持,我們需要自己去處理這個功能。

在服務端發送完畢數據后,可以在服務端關閉 Channel。

ctx.channel.close();

Netty 能做什么

  1. 可以當作一個 http 服務器,但是他並沒有實現 servelt 規范。雖然 Tomcat 底層本身也使用 NIO,但是 Netty 本身的特點決定了它比 Tomcat 的吞吐量更高。相比於 SpringMVC 等框架,Netty 沒提供路由等功能,這也契合和 Netty 的設計思路,它更貼近底層。
  2. Socket 開發,也是應用最為廣泛的領域,底層傳輸的最基礎框架,RPC 框架底層多數采用 Netty。直接采用 Http 當然也可以,但是效率就低了很多了。
  3. 支持長連接的開發,消息推送,聊天,服務端向客戶端推送等等都會采用 WebSocket 協議,就是長連接。

Netty 對 Socket 的實現

對於 Http 編程來說,我們實現了服務端就可以了,客戶端完全可以使用瀏覽器或者 CURL 工具來充當。但是對於 Socket 編程來說,客戶端也得我們自己實現。

服務器端:

Server 類於上面 Http 服務器那個一樣,在 ServerInitoalizer 有一些變化

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面可以有很多 handler,一層層過濾的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        // TCP 粘包 拆包
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符串編碼,解碼
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ServerHandler());

    }
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客戶端:

public class Client {

    public static void main(String[] args) throws InterruptedException {
        //客戶端不需要兩個 group,只需要一個就夠了,直接連接服務端發送數據就可以了
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            //服務器端既可以使用 handler 也可以使用 childhandler, 客戶端一般使用 handler
            //對於 服務端,handler 是針對 bossgroup的,childhandler 是針對 workergorup 的
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道,管道里面可以有很多 handler,一層層過濾的柑橘
        ChannelPipeline pipeline = socketChannel.pipeline();
        // TCP 粘包 拆包
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 字符串編碼,解碼
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new ClientHandler());

    }
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        System.out.println("client output:"+ msg);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("23123");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty 長連接實現一個聊天室

Server 端:

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    //定義 channel group 來管理所有 channel
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {


    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "加入\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "離開\n");
        //這個 channel 會被自動從 channelGroup 里移除

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "上線");

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "離開");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Client 端:

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(;;){
  channel.writeAndFlush(br.readLine() + "\r\n");
}

Netty 心跳

集群之間各個節點的通信,主從節點之間需要進行數據同步,每當主節點的數據發生變化時,通過異步的方式將數據同步到從節點,同步方式可以用日志等等,因此主從節點之間不是實時一致性而是最終一致性。

節點與節點之間如何進行通信那?這種主從模式是需要互相之間有長連接的,這樣來確定對方還活着,實現方式是互相之間定時發送心跳數據包。如果發送幾次后對方還是沒有響應的話,就可以認為對方已經掛掉了。

回到客戶端與服務端的模式,有人可能會想,客戶端斷開連接后服務端的 handlerRemoved 等方法不是能感知嗎?還要心跳干什么哪?

真實情況其實非常復雜,比如手機客戶端和服務端進行一個長連接,客戶端沒有退出應用,客戶端開了飛行模型,或者強制關機,此時雙方是感知不到連接已經斷掉了,或者說需要非常長的時間才能感知到,這是我們不想看到的,這時就需要心跳了。

來看一個示例:

其他的代碼還是和上面的一樣,我們就不列出來了,直接進入主題,看不同的地方:

服務端

	   // Netty 為了支持心跳的 IdleStateHandler,空閑狀態監測處理器。
     pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));

來看看 IdleStateHandler 的說明

/*
 * Triggers an IdleStateEvent when a Channel has not performed read, write, or both    
 * operation for a while
 * 當一個 channel 一斷時間沒有進行 read,write 就觸發一個 IdleStateEvent
 */
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
  this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
  //三個參數分別為多長時間沒進行讀,寫或者讀寫操作則觸發 event。
}

觸發 event 后我們編寫這個 event 對應的處理器。

public class MyHandler extends ChannelInboundHandlerAdapter{
  //觸發某個事件后這個方法就會被調用
  //一個 channelhandlerContext 上下文對象,另一個是事件
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    	if(evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        String eventType = null;
        switch(event.state()){
          case READER_IDLE:
            eventType = "讀空閑";
          case WRITER_IDLE:
            eventType = "寫空閑";
          case ALL_IDLE:
            eventType = "讀寫空閑";
        }
      }else{
        //繼續將事件向下一個 handler 傳遞
        ctx.
      }
  }
}

WebSocket 實現與原理分析

WebSocket 是一種規范,是 HTML5 規范的一部分,主要是解決 Http 協議本身存在的問題。可以實現瀏覽器和服務端的長連接,連接頭信息只在建立連接時發送一次。是在 Http 協議之上構建的,比如請求連接其實是一個 Http 請求,只不過里面加了一些 WebSocket 信息。也可以用在非瀏覽器場合,比如 app 上。

Http 是一種無狀態的基於請求和響應的協議,意思是一定是客戶端想服務端發送一個請求,服務端給客戶端一個響應。Http 1.0 在服務端給客戶端響應后連接就斷了。Http 1.1 增加可 keep-alive,服務端可以和客戶端在短時間之內保持一個連接,某個事件之內服務端和客戶端可以復用這個鏈接。在這種情況下,網頁聊天就是實現不了的,服務端的數據推送是無法實現的。

以前有一些假的長連接技術,比如輪詢,缺點和明顯,這里就不細說了。

Http 2.0 實現了長連接,但是這不在我們討論范圍之內。

針對服務端,Tomcat 新版本,Spring,和 Netty 都實現了對 Websocket 的支持。

使用 Netty 對 WebSocket 的支持來實現長連接

其他的部分還是一樣的,先來看服務端的 WebSocketChannelInitializer。

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
   //需要支持 websocket,我們在 initChannel 是做一點改動
   @Override
   protected void initChannel(SocketChannel ch) throws Exception{
      ChannelPipeline pipeline = ch.pipeline();
      //因為 websocket 是基於 http 的,所以要加入 http 相應的編解碼器
      pipeline.addLast(new HttpServerCodec());
      //以塊的方式進行寫的處理器
      pipeline.addLast(new ChunkedWriteHandler());
      // 進行 http 聚合的處理器,將 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者 
      // FullHttpResponse
      //HttpObjectAggregator 在基於 netty 的 http 編程使用的非常多,粘包拆包。
      pipeline.addLast(new HttpObjectAggregator(8192));
      // 針對 websocket 的類,完成 websocket 構建的所有繁重工作,負責握手,以及心跳(close,ping, 
      // pong)的處理, websocket 通過 frame 幀來傳遞數據。
      // BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame,
      // PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。
      // /ws 是 context_path,websocket 協議標准,ws://server:port/context_path
      pipeline.addLast(new WebSocketServerProcotolHandler("/ws"));
      pipeline.addLast(new TextWebSocketFrameHandler());
   }
}
// websocket 協議需要用幀來傳遞參數
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
     System.out.println("收到消息:"+ msg.text());
     ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器返回"));
   }
   
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
     System.out.println("handlerAdded" + ctx.channel().id.asLongText());
   }
  
   @Override
   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
     System.out.println("handlerRemoved" + ctx.channel().id.asLongText());
   }
  
}

客戶端我們直接通過瀏覽器的原聲 JS 來寫

<script type="text/javascript">
   var socket;
   if(window.WebSocket){
     socket = new WebSocket("ws://localhost:8899/ws");
     socket.onmessage = function(event){
       alert(event.data);
     }
     socket.onopen = function(event){
       alert("連接開啟");
     }
     socket.onclose = function(event){
       alert("連接關閉");
     }
   }else{
     alert("瀏覽器不支持 WebSocket");
   }

   function send(message){
     if(!window.WebSocket){
       return;
     }
     if(socket.readyState == WebSocket.OPEN){
       socket.send(message);
     }
   }
</script>  

我們在瀏覽器中通過 F12 看看 Http 協議升級為 WebSocket 協議的過程。

如果想自己實現一個 RPC 框架,可以參考我的博客:傳送門-> RPC框架


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM