一. SSL/TLS
Java提供了javax.net.ssl
的類SslContext 和SslEngine 可以實現加密解密;
netty用SslHandler實現,內部持有一個SslEngine做實際的工作
SslHandler 數據流圖
- 加密的入站數據被 SslHandler 攔截,並被解密
- 前面加密的數據被 SslHandler 解密
- 平常數據傳過 SslHandler
- SslHandler 加密數據並它傳遞出站
SslHandler 使用 ChannelInitializer 添加到 ChannelPipeline
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslChannelInitializer(SslContext context,
boolean client, boolean startTls) { //1
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.newEngine(ch.alloc()); //2
engine.setUseClientMode(client); //3
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls)); //4
}
}
- 使用構造函數來傳遞 SSLContext 用於使用(startTls 是否啟用)
- 從 SslContext 獲得一個新的 SslEngine 。給每個 SslHandler 實例使用一個新的 SslEngine
- 設置 SslEngine 是 client 或者是 server 模式
- 添加 SslHandler 到 pipeline 作為第一個處理器
SslHandler作為第一個ChannelHandler,在進入前解密,輸出前加密;
SslHandler有很多加密方法,例如在握手階段兩端相互驗證,商定一個加密方法;
您可以配置 SslHandler 修改其行為或提供 在SSL/TLS 握手完成后發送通知,這樣所有數據都將被加密。 SSL/TLS 握手將自動執行。
setHandshakeTimeout(...)
setHandshakeTimeoutMillis(...) 設置/獲取超時,超時后握手ChannelFuture被通知失敗
getHandshakeTimeoutMillis()
setCloseNotifyTimeout(...)
setCloseNotifyTimeoutMillis(...) 失敗后關閉連接
getCloseNotifyTimeoutMillis()
handshakeFuture() 握手完成返回ChannelFuture
close(...)
二. Netty HTTP
Decoder, Encoder 和 Codec
netty提供了簡單的編碼、解碼器簡化http協議的開發工作
HTTP Request
- HTTP Request 第一部分是包含的頭信息
- HttpContent 里面包含的是數據,可以后續有多個 HttpContent 部分
- LastHttpContent 標記是 HTTP request 的結束,同時可能包含頭的尾部信息
- 完整的 HTTP request
HTTP response
- HTTP response 第一部分是包含的頭信息
- HttpContent 里面包含的是數據,可以后續有多個 HttpContent 部分
- LastHttpContent 標記是 HTTP response 的結束,同時可能包含頭的尾部信息
- 完整的 HTTP response
FullHttpRequest和FullHttpResponse是比較特殊的子類型,所有的http消息都實現自HttpObject接口;
http編碼器和解碼器
HttpRequestEncoder 編碼HttpRequest,HttpContent,LastHttpContent消息到bytes
HttpResponseEncoder 編碼HttpResponse,HttpContent,LastHttpContent消息到bytes
HttpRequestDecoder 譯碼bytes到HttpRequest,HttpContent,LastHttpContent
HttpResponseDecoder 譯碼bytes到HttpResponse,HttpContent,LastHttpContent
只需要添加正確的ChannelHandler到ChannelPipeline中
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("decoder", new HttpResponseDecoder()); //1
pipeline.addLast("encoder", new HttpRequestEncoder()); //2
} else {
pipeline.addLast("decoder", new HttpRequestDecoder()); //3
pipeline.addLast("encoder", new HttpResponseEncoder()); //4
}
}
}
- client: 添加 HttpResponseDecoder 用於處理來自 server 響應
- client: 添加 HttpRequestEncoder 用於發送請求到 server
- server: 添加 HttpRequestDecoder 用於接收來自 client 的請求
- server: 添加 HttpResponseEncoder 用來發送響應給 client
消息聚合
HTTP請求和響應可以由許多部分組成,需要提供一個聚合器合並消息到FullHttpRequest 和 FullHttpResponse,這樣總是能看到完整的消息;
這樣的消息會緩沖,知道完整后,才發送到下一個ChannelInboundHandler管道,但是不必擔心碎片;
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpAggregatorInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("codec", new HttpClientCodec()); //1
} else {
pipeline.addLast("codec", new HttpServerCodec()); //2
}
pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); //3
}
}
- client: 添加 HttpClientCodec
- server: 添加 HttpServerCodec 作為我們是 server 模式時
- 添加 HttpObjectAggregator 到 ChannelPipeline, 使用最大消息值是 512kb
HTTP 壓縮
使用 HTTP 時建議壓縮數據以減少傳輸流量
Netty 支持“gzip”和“deflate”
提供了兩個ChannelHandler用於壓縮和解壓
客戶端顯示支持加密模式
GET /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding: gzip, deflate
服務端需要壓縮
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregatorInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec()); //1
pipeline.addLast("decompressor",new HttpContentDecompressor()); //2
} else {
pipeline.addLast("codec", new HttpServerCodec()); //3
pipeline.addLast("compressor",new HttpContentCompressor()); //4
}
}
}
- client: 添加 HttpClientCodec
- client: 添加 HttpContentDecompressor 用於處理來自服務器的壓縮的內容
- server: HttpServerCodec
- server: HttpContentCompressor 用於壓縮來自 client 支持的 HttpContentCompressor
三. HTTPS
啟用 HTTPS,只需添加 SslHandler
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean client;
public HttpsCodecInitializer(SslContext context, boolean client) {
this.context = context;
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = context.newEngine(ch.alloc());
pipeline.addFirst("ssl", new SslHandler(engine)); //1
if (client) {
pipeline.addLast("codec", new HttpClientCodec()); //2
} else {
pipeline.addLast("codec", new HttpServerCodec()); //3
}
}
}
- 添加 SslHandler 到 pipeline 來啟用 HTTPS
- client: 添加 HttpClientCodec
- server: 添加 HttpServerCodec ,如果是 server 模式的話
四. WebSocket
允許雙向傳輸,支持文本和二進制,提供了TCP雙向的連接
開始於普通 HTTP ,並“升級”為雙向 WebSocket;
- Client (HTTP) 與 Server 通訊
- Server (HTTP) 與 Client 通訊
- Client 通過 HTTP(s) 來進行 WebSocket 握手,並等待確認
- 連接協議升級至 WebSocket
需要添加服務端或者客戶端的WebSocket ChannelHandler到pipeline
這個Handler會處理WebSocket定義的消息類型,稱為幀
http://waylau.com/essential-netty-in-action/iamges/Figure 8.4 WebSocket protocol.jpg
BinaryWebSocketFrame 數據幀:二進制數據幀
TextWebSocketFrame 數據幀:文本
ContinuationWebSocketFrame 數據幀:文本或在數據,屬於上一幀
CloseWebSocketFrame 控制幀:關閉
PingWebSocketFrame 控制幀:請求
PongWebSocketFrame 控制幀:響應應
WebSocketServerProtocolHandler在服務器端建立
該類處理協議升級握手以及三個“控制”幀 Close, Ping 和 Pong。
Text 和 Binary 數據幀將被傳遞到下一個處理程序(由你實現)進行處理。
public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536), //1
new WebSocketServerProtocolHandler("/websocket"), //2
new TextFrameHandler(), //3
new BinaryFrameHandler(), //4
new ContinuationFrameHandler()); //5
}
public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
}
- 添加 HttpObjectAggregator 用於提供在握手時聚合 HttpRequest
- 添加 WebSocketServerProtocolHandler 用於處理色好給你寄握手如果請求是發送到"/websocket." 端點,當升級完成后,它將會處理Ping, Pong 和 Close 幀
- TextFrameHandler 將會處理 TextWebSocketFrames
- BinaryFrameHandler 將會處理 BinaryWebSocketFrames
- ContinuationFrameHandler 將會處理ContinuationWebSocketFrames
netty in action 11章
五. SPDY
Google開發的基於,降低延遲,不是替代http,是對http的增強;
- 壓縮報頭
- 加密所有
- 多路復用連接
- 提供支持不同的傳輸優先級
netty in action 12章
六. 空閑連接以及超時
為了及時釋放資源
常見的方法是發送心跳,暴力的方式是直接斷開
IdleStateHandler 時間過長觸發IdleStateEvent,可覆蓋userEventTriggered來處理IdleStateEvent
ReadTimeoutHandler 時間內沒收到數據,拋ReadTimeoutException並關閉channel,可以覆蓋ChannelHandler中的exceptionCaught捕獲
WriteTimeoutHandler ChannelHandler中的exceptionCaught捕獲
利用IdleStateHandler發送心跳例子
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); //1
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1)); //2
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); //3
} else {
super.userEventTriggered(ctx, evt); //4
}
}
}
- IdleStateHandler 將通過 IdleStateEvent 調用 userEventTriggered ,如果連接沒有接收或發送數據超過60秒鍾
- 心跳發送到遠端
- 發送的心跳並添加一個偵聽器,如果發送操作失敗將關閉連接
- 事件不是一個 IdleStateEvent 的話,就將它傳遞給下一個處理程序
七. 分隔符協議
例如SMTP、POP3、IMAP、Telnet等等
DelimiterBasedFrameDecoder 接收ByteBuf由一個或多個分隔符拆分,如NUL或換行符
LineBasedFrameDecoder 接收ByteBuf以分割線結束,如"\n"和"\r\n"
使用"\r\n"分隔符的處理
- 字節流
- 第一幀
- 第二幀
LineBasedFrameDecoder例子
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(65 * 1024)); //1
pipeline.addLast(new FrameHandler()); //2
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { //3
// Do something with the frame
}
}
}
- 添加一個 LineBasedFrameDecoder 用於提取幀並把數據包轉發到下一個管道中的處理程序,在這種情況下就是 FrameHandler
- 添加 FrameHandler 用於接收幀
- 每次調用都需要傳遞一個單幀的內容
使用DelimiterBasedFrameDecoder可以方便處理特定分隔符作為數據結構體的情況
- 傳入的數據流是一系列的幀,每個由換行(“\n”)分隔
- 每幀包括一系列項目,每個由單個空格字符分隔
- 一幀的內容代表一個“命令”:一個名字后跟一些變量參數
定義類:
- 類 Cmd 存儲幀的內容,其中一個 ByteBuf 用於存名字,另外一個存參數
- 類 CmdDecoder 從重寫方法 decode() 中檢索一行,並從其內容中構建一個 Cmd 的實例
- 類 CmdHandler 從 CmdDecoder 接收解碼 Cmd 對象和對它的一些處理
/*
1. 添加一個 CmdDecoder 到管道;將提取 Cmd 對象和轉發到在管道中的下一個處理器
2. 添加 CmdHandler 將接收和處理 Cmd 對象
3. 命令也是 POJO
4. super.decode() 通過結束分隔從 ByteBuf 提取幀
5. frame 是空時,則返回 null
6. 找到第一個空字符的索引。首先是它的命令名;接下來是參數的順序
7. 從幀先於索引以及它之后的片段中實例化一個新的 Cmd 對象
8. 處理通過管道的 Cmd 對象
*/
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CmdDecoder(65 * 1024));//1
pipeline.addLast(new CmdHandler()); //2
}
public static final class Cmd { //3
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf name() {
return name;
}
public ByteBuf args() {
return args;
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); //4
if (frame == null) {
return null; //5
}
int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' '); //6
return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index +1, frame.writerIndex())); //7
}
}
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
// Do something with the command //8
}
}
}
八. 基於長度的協議
幀頭中定義了了幀編碼的長度,提供兩個解碼器,用於處理
FixedLengthFrameDecoder 提取固定長度
LengthFieldBasedFrameDecoder 讀取頭部長度並提取幀的長度
FixedLengthFrameDecoder 的操作是提取固定長度每幀8字節
\
- 字節流 stream
- 4個幀,每個幀8個字節
大部分情況,幀大小寫在編碼頭部,使用LengthFieldBasedFrameDecoder,讀取頭部長度,提取幀長度
LengthFieldBasedFrameDecoder 提供了幾個構造函數覆蓋各種各樣的頭長字段配置情況
使用三個構造函數:maxFrameLength,lengthFieldOffset ,lengthFieldLength
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(65 * 1024)); //1
pipeline.addLast(new FrameHandler()); //2
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// Do something with the frame //3
}
}
}
- 添加一個 LengthFieldBasedFrameDecoder ,用於提取基於幀編碼長度8個字節的幀。
- 添加一個 FrameHandler 用來處理每幀
- 處理幀數據
九. 編寫大型數據
寫大數據時,通知ChannelFuture就返回,但是內存中仍然在接收數據,如果這種連接過多,會產生內存耗盡的風險;
使用zero-copy技術,不占用內存;
interface FileRegion
支持通過Channel實現zero-copy
通過zero-copy從FileInputStream創建DefaultFileRegion並寫入
FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2
channel.writeAndFlush(region).addListener(new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause(); //4
// Do something
}
}
});
- 獲取 FileInputStream
- 創建一個新的 DefaultFileRegion 用於文件的完整長度
- 發送 DefaultFileRegion 並且注冊一個 ChannelFutureListener
- 處理發送失敗
這種方式只能做文件傳輸
需要寫入內存處理使用ChunkedWriteHandler,支持異步大數據流不引起高內存消耗
ChunkedFile 當你使用平台不支持 zero-copy 或者你需要轉換數據,從文件中一塊一塊的獲取數據
ChunkedNioFile 與 ChunkedFile 類似,處理使用了NIOFileChannel
ChunkedStream 從 InputStream 中一塊一塊的轉移內容
ChunkedNioStream 從 ReadableByteChannel 中一塊一塊的轉移內容
從文件系統賦值到用戶內存需要使用ChunkedWriteHandler
ChunkedFile 當你使用平台不支持 zero-copy 或者你需要轉換數據,從文件中一塊一塊的獲取數據
ChunkedNioFile 與 ChunkedFile 類似,處理使用了NIOFileChannel
ChunkedStream 從 InputStream 中一塊一塊的轉移內容
ChunkedNioStream 從 ReadableByteChannel 中一塊一塊的轉移內容
WriteStreamHandler 從文件一塊一塊的寫入數據作為ChunkedStream,然后通過SslHandler傳播的例子
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslCtx.createEngine()); //1
pipeline.addLast(new ChunkedWriteHandler());//2
pipeline.addLast(new WriteStreamHandler());//3
}
public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
- 添加 SslHandler 到 ChannelPipeline.
- 添加 ChunkedWriteHandler 用來處理作為 ChunkedInput 傳進的數據
- 當連接建立時,WriteStreamHandler 開始寫文件的內容
- 當連接建立時,channelActive() 觸發使用 ChunkedInput 來寫文件的內容 (插圖顯示了 FileInputStream;也可以使用任何 InputStream )
需要用戶實現ChunkedInput,安裝ChunkedWriteHandler
十. 序列化數據
JDK 序列化
不需要外部依賴
jdk通過ObjectOutputStream和ObjectInputStream通過原始數據類型和POJO進行序列化和反序列化
CompatibleObjectDecoder 該解碼器使用 JDK 序列化,用於與非 Netty 進行互操作。
CompatibleObjectEncoder 該編碼器使用 JDK 序列化,用於與非 Netty 進行互操作。
ObjectDecoder 基於 JDK 序列化來使用自定義序列化解碼。外部依賴被排除在外時,提供了一個速度提升。否則選擇其他序列化實現
ObjectEncoder 基於 JDK 序列化來使用自定義序列化編碼。外部依賴被排除在外時,提供了一個速度提升。否則選擇其他序列化實現
JBoss Marshalling 序列化
需要外部依賴可以使用 JBoss Marshalling,速度快3倍
修復bug與java.io.Serializable
完全兼容
CompatibleMarshallingDecoder 為了與使用 JDK 序列化的端對端間兼容。
CompatibleMarshallingEncoder 為了與使用 JDK 序列化的端對端間兼容。
MarshallingDecoder 使用自定義序列化用於解碼,必須使用
MarshallingEncoder 使用自定義序列化用於編碼
使用MarshallingDecoder和MarshallingEncoder例子
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider,
MarshallerProvider marshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
pipeline.addLast(new MarshallingEncoder(marshallerProvider));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {
// Do something
}
}
}
ProtoBuf 序列化
速度快,適合跨語言項目;
ProtobufDecoder 使用 ProtoBuf 來解碼消息
ProtobufEncoder 使用 ProtoBuf 來編碼消息
ProtobufVarint32FrameDecoder 在消息的整型長度域中,通過 "Base 128 Varints"將接收到的 ByteBuf 動態的分割
用法
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
private final MessageLite lite;
public ProtoBufInitializer(MessageLite lite) {
this.lite = lite;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(lite));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Do something with the object
}
}
}
- 添加 ProtobufVarint32FrameDecoder 用來分割幀
- 添加 ProtobufEncoder 用來處理消息的編碼
- 添加 ProtobufDecoder 用來處理消息的解碼
- 添加 ObjectHandler 用來處理解碼了的消息