序列化就是將對象的狀態信息轉換成可以存儲或傳輸的過程。
Netty序列化對象一般有以下幾種方式:
JDK
JBoss Marshalling
Protocol Buffers
kryo
JDK
實體類
Request
package com.wk.test.nettyTest.jdk; import java.io.Serializable; public class Request implements Serializable { private String id; private String name; private String info; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }
Response
package com.wk.test.nettyTest.jdk; import java.io.Serializable; public class Response implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
服務端
NettyServerTest
package com.wk.test.nettyTest.jdk; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyServerTest { private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class); public static void main(String[] args) throws InterruptedException { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); sc.pipeline().addLast(new ObjectEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8090).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
ServerHandler
package com.wk.test.nettyTest.jdk; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getName()); response.setResponseMessage("響應內容" + request.getInfo()); ctx.writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客戶端
NettyClientTest
package com.wk.test.nettyTest.jdk; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyClientTest { private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class); private static class SingletonHolder { static final NettyClientTest instance = new NettyClientTest(); } public static NettyClientTest getInstance() { return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; private NettyClientTest() { group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); sc.pipeline().addLast(new ObjectEncoder()); //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通信,則會關閉響應的通道,主要為減小服務端資源占用) sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect() { try { this.cf = b.connect("127.0.0.1", 8090).sync(); System.out.println("遠程服務器已經連接, 可以進行數據交換.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture() { if (this.cf == null) { this.connect(); } if (!this.cf.channel().isActive()) { this.connect(); } return this.cf; } public static void main(String[] args) throws InterruptedException { final NettyClientTest c = NettyClientTest.getInstance(); ChannelFuture future = c.getChannelFuture(); Request request = new Request(); request.setId("1"); request.setName("上杉繪梨衣"); request.setInfo("04.24,和Sakura去東京天空樹,世界上最暖和的地方在天空樹的頂上。"); future.channel().writeAndFlush(request).sync(); Request request2 = new Request(); request2.setId("2"); request2.setName("上杉繪梨衣"); request2.setInfo("04.26,和Sakura去明治神宮,有人在那里舉辦婚禮。"); future.channel().writeAndFlush(request2); Request request3 = new Request(); request3.setId("3"); request3.setName("上杉繪梨衣"); request3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。"); future.channel().writeAndFlush(request3); Request request4 = new Request(); request4.setId("4"); request4.setName("上杉繪梨衣"); request4.setInfo("Sakura最好了。"); future.channel().writeAndFlush(request4); future.channel().closeFuture().sync(); } }
ClientHandler
package com.wk.test.nettyTest.jdk; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
JBoss Marshalling
這種序列化效率比JDK快三倍左右,這里暫不介紹。
protobuf
谷歌開源的一種二進制數據格式,是目前序列化最快的。
相較於json和xml來說,序列化后體積小,傳輸速率快。序列化后不可讀,必須反序列化才可讀。
使用
1.下載
下載地址:https://github.com/google/protobuf/releases
這里下載protoc-3.11.4-win64,windows系統使用的protoc.exe
2.編寫proto格式文件
我們需要編寫一個.proto格式的協議文件,通過該協議文件來生產java類,具體的語法和規則可以參考官方文檔。這里只舉個例子:
Request.proto
syntax = "proto3"; option java_package = "com.wk.test.nettyTest.proto"; option java_outer_classname = "Request"; message MessageRequest{ uint64 id = 1; string name = 2; string info = 3; }
syntax = "proto3";是使用的協議版本是3
java_package 是生成文件的包路徑
java_outer_classname 是類名
message MessageRequest{
uint64 id = 1; string name = 2; string info = 3; }
消息體內容:
64 int類型的id
string 姓名和內容
后面的數字代表一個應答序號,同一級別下不可重復
3.生成協議文件對應的消息類
CMD命令到我們下載好的protoc.exe目錄下,執行命令
protoc.exe ./Request.proto --java_out=./
生成Requst.java
4.編寫代碼
准備工作已經結束了,我們將.proto文件和生成的java文件放入相對應的程序中就可以開始開發了
開發
pom.xml
<!-- protobuf --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.4</version> </dependency>
這里注意要跟下載的protoc.exe版本一致
實體類
就是生成的java和proto文件
服務端
NettyServerTest
package com.wk.test.nettyTest.proto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyServerTest { private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class); public static void main(String[] args) throws InterruptedException { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ProtobufVarint32FrameDecoder()); sc.pipeline().addLast(new ProtobufDecoder(Request.MessageRequest.getDefaultInstance())); sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); sc.pipeline().addLast(new ProtobufEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8090).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
ServerHandler
package com.wk.test.nettyTest.proto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request.MessageRequest request = (Request.MessageRequest)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo()); ctx.writeAndFlush(request); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客戶端
NettyClientTest
package com.wk.test.nettyTest.proto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyClientTest { private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class); private static class SingletonHolder { static final NettyClientTest instance = new NettyClientTest(); } public static NettyClientTest getInstance() { return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; private NettyClientTest() { group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ProtobufVarint32FrameDecoder()); sc.pipeline().addLast(new ProtobufDecoder(Request.MessageRequest.getDefaultInstance())); sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); sc.pipeline().addLast(new ProtobufEncoder()); //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通信,則會關閉響應的通道,主要為減小服務端資源占用) sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect() { try { this.cf = b.connect("127.0.0.1", 8090).sync(); System.out.println("遠程服務器已經連接, 可以進行數據交換.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture() { if (this.cf == null) { this.connect(); } if (!this.cf.channel().isActive()) { this.connect(); } return this.cf; } public static void main(String[] args) throws InterruptedException { final NettyClientTest c = NettyClientTest.getInstance(); ChannelFuture future = c.getChannelFuture(); Request.MessageRequest.Builder builder =Request.MessageRequest.newBuilder(); builder.setId(1); builder.setName("上杉繪梨衣"); builder.setInfo("04.24,和Sakura去東京天空樹,世界上最暖和的地方在天空樹的頂上。"); future.channel().writeAndFlush(builder.build()).sync(); Request.MessageRequest.Builder builder2 =Request.MessageRequest.newBuilder(); builder2.setId(2); builder2.setName("上杉繪梨衣"); builder2.setInfo("04.26,和Sakura去明治神宮,有人在那里舉辦婚禮。"); future.channel().writeAndFlush(builder2.build()); Request.MessageRequest.Builder builder3 =Request.MessageRequest.newBuilder(); builder3.setId(3); builder3.setName("上杉繪梨衣"); builder3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。"); future.channel().writeAndFlush(builder3.build()); Request.MessageRequest.Builder builder4 =Request.MessageRequest.newBuilder(); builder4.setId(4); builder4.setName("上杉繪梨衣"); builder4.setInfo("Sakura最好了。"); future.channel().writeAndFlush(builder4.build()); future.channel().closeFuture().sync(); } }
ClientHandler
package com.wk.test.nettyTest.proto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Request.MessageRequest request = (Request.MessageRequest)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
優缺點
優點:protobuf是目前序列化最快的沒有之一,較json,xml傳輸體積小,速率高,適合高性能通訊的應用場景
缺點:如果修改消息內容,則需要重新生成java類。proto文件和java文件不對應則報錯。
Kryo(推薦使用)
kryo是基於proto的序列化框架,目前的dubbo中就是使用的它,速率僅次於protobuf,體積小,且不用通過proto文件生成java類。
pom.xml
<!-- kryo --> <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>5.0.0-RC5</version> </dependency>
實體類 Request
package com.wk.test.nettyTest.kryo; import java.io.Serializable; public class Request implements Serializable { private String id; private String name; private String info; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }
封裝kryo
因為kryo是線程不安全的,因此我們要對kryo進行一層封裝
Serializer
序列化接口類
package com.wk.test.nettyTest.kryo; public interface Serializer { //序列化接口 byte[] serialize(Object object); //反序列化接口 <T> T deserialize(byte[] bytes); }
KryoSerializer
序列化實現類,通過ThreadLocal 使每個kryo都有一個線程副本,不會相互影響。
package com.wk.test.nettyTest.kryo; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.BeanSerializer; import org.apache.commons.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; public class KryoSerializer implements Serializer { private final Class<?> clazz; public KryoSerializer(Class<?> clazz){ this.clazz = clazz; } final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>(){ @Override protected Kryo initialValue(){ Kryo kryo = new Kryo(); kryo.register(clazz, new BeanSerializer(kryo,clazz)); return kryo; } }; private Kryo getKryo(){ return kryoThreadLocal.get(); } @Override public byte[] serialize(Object object) { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutputStream); try { Kryo kryo = getKryo(); kryo.writeObjectOrNull(output,object,object.getClass()); output.flush(); return byteArrayOutputStream.toByteArray(); }finally { IOUtils.closeQuietly(output); IOUtils.closeQuietly(byteArrayOutputStream); } } @Override public <T> T deserialize(byte[] bytes) { if(bytes ==null){ return null; } ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInputStream); try { Kryo kryo = getKryo(); return (T) kryo.readObjectOrNull(input,clazz); }finally { IOUtils.closeQuietly(input); IOUtils.closeQuietly(byteArrayInputStream); } } }
KryoSerializerFactory
工廠類,通過傳入class來獲取相對應的序列化工具類
package com.wk.test.nettyTest.kryo; public class KryoSerializerFactory { public static Serializer getSerializer(Class<?> clazz){ return new KryoSerializer(clazz); } }
編碼、解碼類(也可以稱為序列化、反序列化類)
KryoMsgEncoder
package com.wk.test.nettyTest.kryo; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class KryoMsgEncoder extends MessageToByteEncoder<Request> { private Serializer serializer = KryoSerializerFactory.getSerializer(Request.class); @Override protected void encode(ChannelHandlerContext channelHandlerContext, Request request, ByteBuf byteBuf) throws Exception { byte[] body = serializer.serialize(request); int headLength = body.length; //相當於消息頭 byteBuf.writeInt(headLength); //相當於消息體 byteBuf.writeBytes(body); } }
KryoMsgDecoder
package com.wk.test.nettyTest.kryo; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class KryoMsgDecoder extends ByteToMessageDecoder { private Serializer serializer = KryoSerializerFactory.getSerializer(Request.class); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { //標記讀取的指針的位置 byteBuf.markReaderIndex(); //獲取消息頭,也就是長度 int dataLength = byteBuf.readInt(); if(dataLength <=0){ //長度不對則當前消息有問題,關閉通道 channelHandlerContext.close(); } //長度小於真實長度則重新加載讀取指針 if(byteBuf.readableBytes() < dataLength){ byteBuf.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; byteBuf.readBytes(body); Request request = serializer.deserialize(body); list.add(request); } }
服務端
NettyKryoServer
package com.wk.test.nettyTest.kryo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyKryoServer { private static final Logger logger = LoggerFactory.getLogger(NettyKryoServer.class); public static void main(String[] args) throws InterruptedException { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new KryoMsgDecoder()); sc.pipeline().addLast(new KryoMsgEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new KryoServerHandler()); } }); ChannelFuture cf = b.bind(8090).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
KryoServerHandler
package com.wk.test.nettyTest.kryo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class KryoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo()); ctx.writeAndFlush(request); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客戶端
NettyKryoClient
package com.wk.test.nettyTest.kryo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyKryoClient { private static final Logger logger = LoggerFactory.getLogger(NettyKryoClient.class); private static class SingletonHolder { static final NettyKryoClient instance = new NettyKryoClient(); } public static NettyKryoClient getInstance() { return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; private NettyKryoClient() { group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new KryoMsgDecoder()); sc.pipeline().addLast(new KryoMsgEncoder()); //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通信,則會關閉響應的通道,主要為減小服務端資源占用) sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new KryoClientHandler()); } }); } public void connect() { try { this.cf = b.connect("127.0.0.1", 8090).sync(); System.out.println("遠程服務器已經連接, 可以進行數據交換.."); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture() { if (this.cf == null) { this.connect(); } if (!this.cf.channel().isActive()) { this.connect(); } return this.cf; } public static void main(String[] args) throws InterruptedException { final NettyKryoClient c = NettyKryoClient.getInstance(); ChannelFuture future = c.getChannelFuture(); Request request = new Request(); request.setId("1"); request.setName("上杉繪梨衣"); request.setInfo("04.24,和Sakura去東京天空樹,世界上最暖和的地方在天空樹的頂上。"); future.channel().writeAndFlush(request).sync(); Request request2 = new Request(); request2.setId("2"); request2.setName("上杉繪梨衣"); request2.setInfo("04.26,和Sakura去明治神宮,有人在那里舉辦婚禮。"); future.channel().writeAndFlush(request2); Request request3 = new Request(); request3.setId("3"); request3.setName("上杉繪梨衣"); request3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。"); future.channel().writeAndFlush(request3); Request request4 = new Request(); request4.setId("4"); request4.setName("上杉繪梨衣"); request4.setInfo("Sakura最好了。"); future.channel().writeAndFlush(request4); future.channel().closeFuture().sync(); } }
KryoClientHandler
package com.wk.test.nettyTest.kryo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class KryoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Request resp = (Request)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getInfo()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }