背景
作為網絡傳輸框架,免不了傳輸對象,對象在傳輸之前就要序列化,這個序列化的過程就是編碼過程。接收到編碼后的數據就需要解碼,還原傳輸的數據。
編解碼技術就是java序列化技術,序列化的目的有兩個,一是進行網絡傳輸,二是對象持久化。
但是Java的序列化缺點很多,如無法跨語言,序列化后碼流太大,序列化性能太低
主流的序列化框架:
JBoss的Marshalling包
google的Protobuf
基於Protobuf的Kyro
MessagePack框架
JBoss Marshalling的實現
代碼示例:

public class Server { public static void main(String[] args) throws Exception { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) // 設置日志 .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Req req = (Req) msg; System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage()); byte[] attachment = GzipUtils.ungzip(req.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg"; FileOutputStream fos = new FileOutputStream(path); fos.write(attachment); fos.close(); Resp resp = new Resp(); resp.setId(req.getId()); resp.setName("resp" + req.getId()); resp.setResponseMessage("響應內容" + req.getId()); ctx.writeAndFlush(resp);// .addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for (int i = 0; i < 5; i++) { Req req = new Req(); req.setId("" + i); req.setName("pro" + i); req.setRequestMessage("數據信息" + i); String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg"; File file = new File(path); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); req.setAttachment(GzipUtils.gzip(data)); cf.channel().writeAndFlush(req); } cf.channel().closeFuture().sync(); group.shutdownGracefully(); } } public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Resp resp = (Resp) msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } /** * Marshalling工廠 * */ public final class MarshallingCodeCFactory { /** * 創建Jboss Marshalling解碼器MarshallingDecoder * * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { // 首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); // 創建了MarshallingConfiguration對象,配置了版本號為5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); // 根據marshallerFactory和configuration創建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); // 構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } /** * 創建Jboss Marshalling編碼器MarshallingEncoder * * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); // 構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } } public class Req implements Serializable { private static final long serialVersionUID = 1L; private String id; private String name; private String requestMessage; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } } public class Resp implements Serializable { private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
工具類:

public class GzipUtils { public static byte[] gzip(byte[] data) throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(bos); gzip.write(data); gzip.finish(); gzip.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } public static byte[] ungzip(byte[] data) throws Exception { ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream gzip = new GZIPInputStream(bis); byte[] buf = new byte[1024]; int num = -1; ByteArrayOutputStream bos = new ByteArrayOutputStream(); while ((num = gzip.read(buf, 0, buf.length)) != -1) { bos.write(buf, 0, num); } gzip.close(); bis.close(); byte[] ret = bos.toByteArray(); bos.flush(); bos.close(); return ret; } public static void main(String[] args) throws Exception { // 讀取文件 String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "006.jpg"; File file = new File(readPath); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); System.out.println("文件原始大小:" + data.length); // 測試壓縮 byte[] ret1 = GzipUtils.gzip(data); System.out.println("壓縮之后大小:" + ret1.length); byte[] ret2 = GzipUtils.ungzip(ret1); System.out.println("還原之后大小:" + ret2.length); // 寫出文件 String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "006.jpg"; FileOutputStream fos = new FileOutputStream(writePath); fos.write(ret2); fos.close(); } }
UDP的實現
代碼示例:

public class Server { public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true) .handler(new ServerHandler()); b.bind(port).sync().channel().closeFuture().await(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new Server().run(8765); new Server().run(8764); } } public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { // 諺語列表 private static final String[] DICTIONARY = { "只要功夫深,鐵棒磨成針。", "舊時王謝堂前燕,飛入尋常百姓家。", "洛陽親友如相問,一片冰心在玉壺。", "一寸光陰一寸金,寸金難買寸光陰。", "老驥伏櫪,志在千里。烈士暮年,壯心不已!" }; private String nextQuote() { int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length); return DICTIONARY[quoteId]; } @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { String req = packet.content().toString(CharsetUtil.UTF_8); System.out.println(req); if ("諺語字典查詢?".equals(req)) { ctx.writeAndFlush( new DatagramPacket(Unpooled.copiedBuffer("諺語查詢結果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender())); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } } public class Client { public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true) .handler(new ClientHandler()); Channel ch = b.bind(0).sync().channel(); // 向網段內的所有機器廣播UDP消息 ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語字典查詢?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync(); if (!ch.closeFuture().await(15000)) { System.out.println("查詢超時!"); } } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new Client().run(8765); } } public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { String response = msg.content().toString(CharsetUtil.UTF_8); if (response.startsWith("諺語查詢結果: ")) { System.out.println(response); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }