本篇將自定義 編碼解碼器,對數據傳輸過程進行“入站解碼,出站編碼”。
- 服務端接收的是字節數據,通過“入站解碼”,得到知道格式的數據;
- 服務器發送指定格式數據通過 “出站編碼” 轉換成字節數據,然后發送給客戶端;
- 客戶端類似;
- ChannelPipeLine 管理一系列 ChannelHandler,入站消息解碼后轉發給下一個 handler 進行處理
案例需求:客戶端或服務器發送 Long 類型數據,出站編碼成字節數據,入站解碼讀取對方發送的消息

編碼器 MyLongToByteEncoder
package com.oy.inboundandoutbound;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encoder 被調用. msg: " + msg);
out.writeLong(msg);
}
}
解碼器 MyByteToLongDecoder
package com.oy.inboundandoutbound;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* decode() 會根據接收的數據,被調用多次,知道確定沒有新的元素添加到list,
* 或者是 ByteBuf 沒有更多的可讀字節為止。
* 如果 list 不為空,就會將 list 的內容傳遞給下一個 handler
* @param ctx 上下文對象
* @param in 入站后的 ByteBuf
* @param out 將解碼后的數據傳遞給下一個 handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// long 類型 為 8 字節
if (in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
Server
package com.oy.inboundandoutbound.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boss, work)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerChannelInitializer());
ChannelFuture future = serverBootstrap.bind(8004).sync();
System.out.println("server started and listen " + 8004);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
package com.oy.inboundandoutbound.server;
import com.oy.inboundandoutbound.MyByteToLongDecoder;
import com.oy.inboundandoutbound.MyLongToByteEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
/* 向管道加入處理器 */
ChannelPipeline pipeline = socketChannel.pipeline();
// 入站的 handler 進行解碼
pipeline.addLast("decoder", new MyByteToLongDecoder());
// 添加一個出站的 handler 對數據進行編碼
pipeline.addLast("encoder", new MyLongToByteEncoder());
// 添加自定義的處理器
pipeline.addLast("MyServerHandler", new MyServerHandler());
}
}
package com.oy.inboundandoutbound.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("從客戶端讀到的數據:" + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 服務器返回 long 類型數據
ctx.writeAndFlush(654321L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
Client
package com.oy.inboundandoutbound.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientChannelInitializer());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8004).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
package com.oy.inboundandoutbound.client;
import com.oy.inboundandoutbound.MyByteToLongDecoder;
import com.oy.inboundandoutbound.MyLongToByteEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClientChannelInitializer extends ChannelInitializer<NioSocketChannel> {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 入站的 handler 進行解碼
pipeline.addLast("decoder", new MyByteToLongDecoder());
// 添加一個出站的 handler 對數據進行編碼
pipeline.addLast("encoder", new MyLongToByteEncoder());
// 添加自定義 handler,處理業務邏輯
pipeline.addLast(new MyClientHandler());
}
}
package com.oy.inboundandoutbound.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long msg) throws Exception {
// 客戶端讀取服務器發送的 long 類型數據
System.out.println("客戶端讀取服務器發送的, msg:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 客戶端發送 long 類型數據
ctx.writeAndFlush(123456L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
啟動服務器和客戶端程序, 控制台打印結果:


