最近在重新看netty,在這里總結一些netty的一些常用的使用方式,類似於模板,方便速查。
以netty 4.1.x的API作記錄,不同版本可能API有略微差異,需要注意netty5被廢棄掉了(辨別方式是看SimpleChannelInboundHandler是否有一個messageReceived方法 有的話就是5),netty3是以org.jboss開頭為包名。
統一模板
大多數情況下使用netty的步驟是定義好EventLoopGroup,定義好Bootstrap(ServerBootstrap)以及使用的channel類型(一般就是NioSocketChannel,服務端是NioServerSocketChannel)。
剩下是業務相關,使用的ChannelInitializer和具體的handler。
主要就是2+N(N為自定義的handler數量)個類。
服務端啟動模板(也可以不區分boss和worker 用一個):
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyChannelInitializer());
ChannelFuture future = serverBootstrap.bind(8999).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
客戶端啟動模板:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new MyChannelInitializer());
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
ChannelInitializer模板(繼承ChannelInitializer即可):
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(...);
}
}
接下來的例子就以這個模板為骨架,主要涉及到初始化器的代碼,啟動代碼一致。
處理Http請求
netty自帶了對用的codec類比較方便。
pipeline.addLast("httpServerCodec", new HttpServerCodec());
自己實現的handler最簡單的方式用SimpleChannelInboundHandler接收HttpRequest方法即可
class MyHttpHandler extends SimpleChannelInboundHandler<HttpRequest>
這里簡單說下SimpleChannelInboundHandler這個類,他是簡化處理接受信息並處理的一個類,主要做兩件事。
第一件事是根據泛型決定是否處理這個消息,能夠處理就自己處理,不行就交給下一個(可以參考acceptInboundMessage和channelRead方法)。
第二件事是消息的自動回收(有構造函數支持 默認是true),消息的引用計數會減一(所以在其他地方還要使用記得再retain一下)。
使用它可以節省很多冗余代碼的編寫。
一個簡單例子:
public class MyHttpHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
System.out.println(msg.getClass());
System.out.println(msg.uri());
System.out.println(msg.method().name());
System.out.println(ctx.channel().remoteAddress());
System.out.println("headers:");
msg.headers().forEach(System.out::println);
ByteBuf buf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
ctx.writeAndFlush(response);
// ctx.channel().close();
}
}
不過只用這個handler並不能拿到content,還需要配合ChunkedWriteHandler和HttpObjectAggregator得到FullHttpRequest對象。
處理WebSocket請求
只需要在上面的基礎上增加一個WebSocketServerProtocolHandler即可,完整如下:
pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(8096));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
自己的處理器可以接收並處理WebSocketFrame的子類。
自定義文本協議
netty提供了幾個比較方便的用於自定義文本協議的編解碼器。
基於長度
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyHandler());
上述參數的意義直接搬了文檔:
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
基於分隔符
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
第二個參數需要一個ByteBuf[],自己指定下分隔符即可。
注意使用這種發消息的時候要帶上那個分隔符,不然會處理失敗(被當做未結束)。
廣播實現
利用了netty提供的一個很便利的類,ChannelGroup.
首先要了解一下Channel生命周期函數的調用:
channel added -> channel registered -> channel active -> channel inactive -> channel unregistered -> channel removed
我們可以定義一個靜態(保證共享和唯一)的ChannelGroup在channel added的時候把對應channel增加到ChannelGroup中即可(但不需要自己移除,這一點他自己實現了)
然后利用它的寫方法就可以實現廣播,或者forEach做下過濾做多播也可以。
這個實現和用什么協議無關,主要涉及到ChannelGroup使用。
心跳
netty自帶的IdleStateHandler,超時后會向下一個handler發出IdleStateEvent消息,接收並處理即可。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
ctx.channel().writeAndFlush("超時").addListener((ChannelFuture ch) -> ch.channel().close());
}
}
它細分為3中類型的超時,讀、寫、讀寫,通過IdleStateEvent的state屬性可以獲取,可以單獨判斷。
其他的一些模式會在之后復習和使用過程中不斷補完~
2017年10月08日00點53分 init
2017年10月10日10點02分 更新websocket
