netty-基礎-http/https&ssl&序列化&零拷貝


一. SSL/TLS

Java提供了javax.net.ssl的類SslContext 和SslEngine 可以實現加密解密;
netty用SslHandler實現,內部持有一個SslEngine做實際的工作
SslHandler 數據流圖

  1. 加密的入站數據被 SslHandler 攔截,並被解密
  2. 前面加密的數據被 SslHandler 解密
  3. 平常數據傳過 SslHandler
  4. SslHandler 加密數據並它傳遞出站

SslHandler 使用 ChannelInitializer 添加到 ChannelPipeline

public class SslChannelInitializer extends ChannelInitializer<Channel> {

    private final SslContext context;
    private final boolean startTls;
    public SslChannelInitializer(SslContext context,
    boolean client, boolean startTls) {   //1
        this.context = context;
        this.startTls = startTls;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.newEngine(ch.alloc());  //2
        engine.setUseClientMode(client); //3
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));  //4
    }
}
  • 使用構造函數來傳遞 SSLContext 用於使用(startTls 是否啟用)
  • 從 SslContext 獲得一個新的 SslEngine 。給每個 SslHandler 實例使用一個新的 SslEngine
  • 設置 SslEngine 是 client 或者是 server 模式
  • 添加 SslHandler 到 pipeline 作為第一個處理器

SslHandler作為第一個ChannelHandler,在進入前解密,輸出前加密;
SslHandler有很多加密方法,例如在握手階段兩端相互驗證,商定一個加密方法;
您可以配置 SslHandler 修改其行為或提供 在SSL/TLS 握手完成后發送通知,這樣所有數據都將被加密。 SSL/TLS 握手將自動執行。

setHandshakeTimeout(...) 
setHandshakeTimeoutMillis(...)      設置/獲取超時,超時后握手ChannelFuture被通知失敗
getHandshakeTimeoutMillis()
setCloseNotifyTimeout(...) 
setCloseNotifyTimeoutMillis(...)    失敗后關閉連接
getCloseNotifyTimeoutMillis()
handshakeFuture()                   握手完成返回ChannelFuture
close(...)   

二. Netty HTTP

Decoder, Encoder 和 Codec
netty提供了簡單的編碼、解碼器簡化http協議的開發工作

HTTP Request

  • HTTP Request 第一部分是包含的頭信息
  • HttpContent 里面包含的是數據,可以后續有多個 HttpContent 部分
  • LastHttpContent 標記是 HTTP request 的結束,同時可能包含頭的尾部信息
  • 完整的 HTTP request

HTTP response

  • HTTP response 第一部分是包含的頭信息
  • HttpContent 里面包含的是數據,可以后續有多個 HttpContent 部分
  • LastHttpContent 標記是 HTTP response 的結束,同時可能包含頭的尾部信息
  • 完整的 HTTP response

FullHttpRequest和FullHttpResponse是比較特殊的子類型,所有的http消息都實現自HttpObject接口;

http編碼器和解碼器

HttpRequestEncoder      編碼HttpRequest,HttpContent,LastHttpContent消息到bytes
HttpResponseEncoder     編碼HttpResponse,HttpContent,LastHttpContent消息到bytes
HttpRequestDecoder      譯碼bytes到HttpRequest,HttpContent,LastHttpContent
HttpResponseDecoder     譯碼bytes到HttpResponse,HttpContent,LastHttpContent

只需要添加正確的ChannelHandler到ChannelPipeline中

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {

    private final boolean client;
    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("decoder", new HttpResponseDecoder());  //1
            pipeline.addLast("encoder", new HttpRequestEncoder());  //2
        } else {
            pipeline.addLast("decoder", new HttpRequestDecoder());  //3
            pipeline.addLast("encoder", new HttpResponseEncoder());  //4
        }
    }
}
  1. client: 添加 HttpResponseDecoder 用於處理來自 server 響應
  2. client: 添加 HttpRequestEncoder 用於發送請求到 server
  3. server: 添加 HttpRequestDecoder 用於接收來自 client 的請求
  4. server: 添加 HttpResponseEncoder 用來發送響應給 client

消息聚合
HTTP請求和響應可以由許多部分組成,需要提供一個聚合器合並消息到FullHttpRequest 和 FullHttpResponse,這樣總是能看到完整的消息;
這樣的消息會緩沖,知道完整后,才發送到下一個ChannelInboundHandler管道,但是不必擔心碎片;

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
    private final boolean client;
    public HttpAggregatorInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("codec", new HttpClientCodec());  //1
        } else {
            pipeline.addLast("codec", new HttpServerCodec());  //2
        }
        pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));  //3
    }
}
  1. client: 添加 HttpClientCodec
  2. server: 添加 HttpServerCodec 作為我們是 server 模式時
  3. 添加 HttpObjectAggregator 到 ChannelPipeline, 使用最大消息值是 512kb

HTTP 壓縮
使用 HTTP 時建議壓縮數據以減少傳輸流量
Netty 支持“gzip”和“deflate”
提供了兩個ChannelHandler用於壓縮和解壓

客戶端顯示支持加密模式

GET /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding: gzip, deflate

服務端需要壓縮

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {

    private final boolean isClient;
    public HttpAggregatorInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec()); //1
            pipeline.addLast("decompressor",new HttpContentDecompressor()); //2
        } else {
            pipeline.addLast("codec", new HttpServerCodec()); //3
            pipeline.addLast("compressor",new HttpContentCompressor()); //4
        }
    }
}
  1. client: 添加 HttpClientCodec
  2. client: 添加 HttpContentDecompressor 用於處理來自服務器的壓縮的內容
  3. server: HttpServerCodec
  4. server: HttpContentCompressor 用於壓縮來自 client 支持的 HttpContentCompressor

三. HTTPS

啟用 HTTPS,只需添加 SslHandler

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean client;
    public HttpsCodecInitializer(SslContext context, boolean client) {
        this.context = context;
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        SSLEngine engine = context.newEngine(ch.alloc());
        pipeline.addFirst("ssl", new SslHandler(engine));  //1

        if (client) {
            pipeline.addLast("codec", new HttpClientCodec());  //2
        } else {
            pipeline.addLast("codec", new HttpServerCodec());  //3
        }
    }
}
  1. 添加 SslHandler 到 pipeline 來啟用 HTTPS
  2. client: 添加 HttpClientCodec
  3. server: 添加 HttpServerCodec ,如果是 server 模式的話

四. WebSocket

允許雙向傳輸,支持文本和二進制,提供了TCP雙向的連接
開始於普通 HTTP ,並“升級”為雙向 WebSocket;

  • Client (HTTP) 與 Server 通訊
  • Server (HTTP) 與 Client 通訊
  • Client 通過 HTTP(s) 來進行 WebSocket 握手,並等待確認
  • 連接協議升級至 WebSocket

需要添加服務端或者客戶端的WebSocket ChannelHandler到pipeline
這個Handler會處理WebSocket定義的消息類型,稱為幀
http://waylau.com/essential-netty-in-action/iamges/Figure 8.4 WebSocket protocol.jpg

BinaryWebSocketFrame            數據幀:二進制數據幀
TextWebSocketFrame              數據幀:文本
ContinuationWebSocketFrame      數據幀:文本或在數據,屬於上一幀
CloseWebSocketFrame             控制幀:關閉
PingWebSocketFrame              控制幀:請求
PongWebSocketFrame              控制幀:響應應

WebSocketServerProtocolHandler在服務器端建立
該類處理協議升級握手以及三個“控制”幀 Close, Ping 和 Pong。
Text 和 Binary 數據幀將被傳遞到下一個處理程序(由你實現)進行處理。

public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
                new HttpServerCodec(),
                new HttpObjectAggregator(65536),  //1
                new WebSocketServerProtocolHandler("/websocket"),  //2
                new TextFrameHandler(),  //3
                new BinaryFrameHandler(),  //4
                new ContinuationFrameHandler());  //5
    }

    public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            // Handle text frame
        }
    }

    public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
            // Handle binary frame
        }
    }

    public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
            // Handle continuation frame
        }
    }
}
  1. 添加 HttpObjectAggregator 用於提供在握手時聚合 HttpRequest
  2. 添加 WebSocketServerProtocolHandler 用於處理色好給你寄握手如果請求是發送到"/websocket." 端點,當升級完成后,它將會處理Ping, Pong 和 Close 幀
  3. TextFrameHandler 將會處理 TextWebSocketFrames
  4. BinaryFrameHandler 將會處理 BinaryWebSocketFrames
  5. ContinuationFrameHandler 將會處理ContinuationWebSocketFrames

netty in action 11章

五. SPDY

Google開發的基於,降低延遲,不是替代http,是對http的增強;

  • 壓縮報頭
  • 加密所有
  • 多路復用連接
  • 提供支持不同的傳輸優先級

netty in action 12章

六. 空閑連接以及超時

為了及時釋放資源
常見的方法是發送心跳,暴力的方式是直接斷開

IdleStateHandler    時間過長觸發IdleStateEvent,可覆蓋userEventTriggered來處理IdleStateEvent
ReadTimeoutHandler  時間內沒收到數據,拋ReadTimeoutException並關閉channel,可以覆蓋ChannelHandler中的exceptionCaught捕獲
WriteTimeoutHandler ChannelHandler中的exceptionCaught捕獲

利用IdleStateHandler發送心跳例子

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));  //1
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));  //2

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
             ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                     .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  //3
        } else {
            super.userEventTriggered(ctx, evt);  //4
        }
    }
}
  • IdleStateHandler 將通過 IdleStateEvent 調用 userEventTriggered ,如果連接沒有接收或發送數據超過60秒鍾
  • 心跳發送到遠端
  • 發送的心跳並添加一個偵聽器,如果發送操作失敗將關閉連接
  • 事件不是一個 IdleStateEvent 的話,就將它傳遞給下一個處理程序

七. 分隔符協議

例如SMTP、POP3、IMAP、Telnet等等

DelimiterBasedFrameDecoder	接收ByteBuf由一個或多個分隔符拆分,如NUL或換行符
LineBasedFrameDecoder		接收ByteBuf以分割線結束,如"\n"和"\r\n"

使用"\r\n"分隔符的處理

  1. 字節流
  2. 第一幀
  3. 第二幀

LineBasedFrameDecoder例子

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LineBasedFrameDecoder(65 * 1024));   //1
        pipeline.addLast(new FrameHandler());  //2
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {  //3
            // Do something with the frame
        }
    }
}
  1. 添加一個 LineBasedFrameDecoder 用於提取幀並把數據包轉發到下一個管道中的處理程序,在這種情況下就是 FrameHandler
  2. 添加 FrameHandler 用於接收幀
  3. 每次調用都需要傳遞一個單幀的內容

使用DelimiterBasedFrameDecoder可以方便處理特定分隔符作為數據結構體的情況

  1. 傳入的數據流是一系列的幀,每個由換行(“\n”)分隔
  2. 每幀包括一系列項目,每個由單個空格字符分隔
  3. 一幀的內容代表一個“命令”:一個名字后跟一些變量參數

定義類:

  • 類 Cmd 存儲幀的內容,其中一個 ByteBuf 用於存名字,另外一個存參數
  • 類 CmdDecoder 從重寫方法 decode() 中檢索一行,並從其內容中構建一個 Cmd 的實例
  • 類 CmdHandler 從 CmdDecoder 接收解碼 Cmd 對象和對它的一些處理
/*
	1. 添加一個 CmdDecoder 到管道;將提取 Cmd 對象和轉發到在管道中的下一個處理器
	2. 添加 CmdHandler 將接收和處理 Cmd 對象
	3. 命令也是 POJO
	4. super.decode() 通過結束分隔從 ByteBuf 提取幀
	5. frame 是空時,則返回 null
	6. 找到第一個空字符的索引。首先是它的命令名;接下來是參數的順序
	7. 從幀先於索引以及它之后的片段中實例化一個新的 Cmd 對象
	8. 處理通過管道的 Cmd 對象
*/
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new CmdDecoder(65 * 1024));//1
        pipeline.addLast(new CmdHandler()); //2
    }

    public static final class Cmd { //3
        private final ByteBuf name;
        private final ByteBuf args;

        public Cmd(ByteBuf name, ByteBuf args) {
            this.name = name;
            this.args = args;
        }

        public ByteBuf name() {
            return name;
        }

        public ByteBuf args() {
            return args;
        }
    }

    public static final class CmdDecoder extends LineBasedFrameDecoder {
        public CmdDecoder(int maxLength) {
            super(maxLength);
        }

        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
            ByteBuf frame =  (ByteBuf) super.decode(ctx, buffer); //4
            if (frame == null) {
                return null; //5
            }
            int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' ');  //6
            return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index +1, frame.writerIndex())); //7
        }
    }

    public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
            // Do something with the command  //8
        }
    }
}

八. 基於長度的協議

幀頭中定義了了幀編碼的長度,提供兩個解碼器,用於處理

FixedLengthFrameDecoder			提取固定長度
LengthFieldBasedFrameDecoder	讀取頭部長度並提取幀的長度

FixedLengthFrameDecoder 的操作是提取固定長度每幀8字節
\

  1. 字節流 stream
  2. 4個幀,每個幀8個字節

大部分情況,幀大小寫在編碼頭部,使用LengthFieldBasedFrameDecoder,讀取頭部長度,提取幀長度


LengthFieldBasedFrameDecoder 提供了幾個構造函數覆蓋各種各樣的頭長字段配置情況
使用三個構造函數:maxFrameLength,lengthFieldOffset ,lengthFieldLength

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LineBasedFrameDecoder(65 * 1024));  //1
        pipeline.addLast(new FrameHandler()); //2
    }
    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // Do something with the frame  //3
        }
    }
}
  1. 添加一個 LengthFieldBasedFrameDecoder ,用於提取基於幀編碼長度8個字節的幀。
  2. 添加一個 FrameHandler 用來處理每幀
  3. 處理幀數據

九. 編寫大型數據

寫大數據時,通知ChannelFuture就返回,但是內存中仍然在接收數據,如果這種連接過多,會產生內存耗盡的風險;
使用zero-copy技術,不占用內存;
interface FileRegion支持通過Channel實現zero-copy
通過zero-copy從FileInputStream創建DefaultFileRegion並寫入

FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2

channel.writeAndFlush(region).addListener(new ChannelFutureListener() { //3
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause(); //4
            // Do something
        }
    }
});
  • 獲取 FileInputStream
  • 創建一個新的 DefaultFileRegion 用於文件的完整長度
  • 發送 DefaultFileRegion 並且注冊一個 ChannelFutureListener
  • 處理發送失敗
    這種方式只能做文件傳輸

需要寫入內存處理使用ChunkedWriteHandler,支持異步大數據流不引起高內存消耗

ChunkedFile			當你使用平台不支持 zero-copy 或者你需要轉換數據,從文件中一塊一塊的獲取數據
ChunkedNioFile		與 ChunkedFile 類似,處理使用了NIOFileChannel
ChunkedStream		從 InputStream 中一塊一塊的轉移內容
ChunkedNioStream	從 ReadableByteChannel 中一塊一塊的轉移內容

從文件系統賦值到用戶內存需要使用ChunkedWriteHandler

ChunkedFile			當你使用平台不支持 zero-copy 或者你需要轉換數據,從文件中一塊一塊的獲取數據
ChunkedNioFile		與 ChunkedFile 類似,處理使用了NIOFileChannel
ChunkedStream		從 InputStream 中一塊一塊的轉移內容
ChunkedNioStream	從 ReadableByteChannel 中一塊一塊的轉移內容

WriteStreamHandler 從文件一塊一塊的寫入數據作為ChunkedStream,然后通過SslHandler傳播的例子

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.createEngine()); //1
        pipeline.addLast(new ChunkedWriteHandler());//2
        pipeline.addLast(new WriteStreamHandler());//3
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {  //4

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}
  • 添加 SslHandler 到 ChannelPipeline.
  • 添加 ChunkedWriteHandler 用來處理作為 ChunkedInput 傳進的數據
  • 當連接建立時,WriteStreamHandler 開始寫文件的內容
  • 當連接建立時,channelActive() 觸發使用 ChunkedInput 來寫文件的內容 (插圖顯示了 FileInputStream;也可以使用任何 InputStream )

需要用戶實現ChunkedInput,安裝ChunkedWriteHandler

十. 序列化數據

JDK 序列化
不需要外部依賴
jdk通過ObjectOutputStream和ObjectInputStream通過原始數據類型和POJO進行序列化和反序列化

CompatibleObjectDecoder		該解碼器使用 JDK 序列化,用於與非 Netty 進行互操作。
CompatibleObjectEncoder		該編碼器使用 JDK 序列化,用於與非 Netty 進行互操作。
ObjectDecoder				基於 JDK 序列化來使用自定義序列化解碼。外部依賴被排除在外時,提供了一個速度提升。否則選擇其他序列化實現
ObjectEncoder				基於 JDK 序列化來使用自定義序列化編碼。外部依賴被排除在外時,提供了一個速度提升。否則選擇其他序列化實現

JBoss Marshalling 序列化
需要外部依賴可以使用 JBoss Marshalling,速度快3倍
修復bug與java.io.Serializable完全兼容

CompatibleMarshallingDecoder	為了與使用 JDK 序列化的端對端間兼容。
CompatibleMarshallingEncoder	為了與使用 JDK 序列化的端對端間兼容。
MarshallingDecoder				使用自定義序列化用於解碼,必須使用
MarshallingEncoder 				使用自定義序列化用於編碼

使用MarshallingDecoder和MarshallingEncoder例子

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider,
                                  MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline.addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {
            // Do something
        }
    }
}

ProtoBuf 序列化
速度快,適合跨語言項目;

ProtobufDecoder					使用 ProtoBuf 來解碼消息
ProtobufEncoder					使用 ProtoBuf 來編碼消息
ProtobufVarint32FrameDecoder	在消息的整型長度域中,通過 "Base 128 Varints"將接收到的 ByteBuf 動態的分割

用法

public class ProtoBufInitializer extends ChannelInitializer<Channel> {

    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new ProtobufDecoder(lite));
        pipeline.addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // Do something with the object
        }
    }
}
  1. 添加 ProtobufVarint32FrameDecoder 用來分割幀
  2. 添加 ProtobufEncoder 用來處理消息的編碼
  3. 添加 ProtobufDecoder 用來處理消息的解碼
  4. 添加 ObjectHandler 用來處理解碼了的消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM