Netty(四、序列化與反序列化)


序列化就是將對象的狀態信息轉換成可以存儲或傳輸的過程。

Netty序列化對象一般有以下幾種方式:

JDK

JBoss Marshalling

Protocol Buffers

kryo

JDK

實體類

Request

package com.wk.test.nettyTest.jdk;

import java.io.Serializable;

public class Request implements Serializable {
    private String id;
    private String name;
    private String info;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}

Response

package com.wk.test.nettyTest.jdk;

import java.io.Serializable;

public class Response implements Serializable{
    
    private static final long serialVersionUID = 1L;
    
    private String id;
    private String name;
    private String responseMessage;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getResponseMessage() {
        return responseMessage;
    }
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }
    

}

服務端 

NettyServerTest

package com.wk.test.nettyTest.jdk;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerTest {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                //設置日志
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        sc.pipeline().addLast(new ObjectEncoder());
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture cf = b.bind(8090).sync();

        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

ServerHandler

package com.wk.test.nettyTest.jdk;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request)msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
        Response response = new Response();
        response.setId(request.getId());
        response.setName("response" + request.getName());
        response.setResponseMessage("響應內容" + request.getInfo());
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    
    
}

客戶端

NettyClientTest

package com.wk.test.nettyTest.jdk;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientTest {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);

    private static class SingletonHolder {
        static final NettyClientTest instance = new NettyClientTest();
    }

    public static NettyClientTest getInstance() {
        return SingletonHolder.instance;
    }

    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf;

    private NettyClientTest() {
        group = new NioEventLoopGroup();
        b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        sc.pipeline().addLast(new ObjectEncoder());
                        //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通信,則會關閉響應的通道,主要為減小服務端資源占用)
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new ClientHandler());
                    }
                });
    }

    public void connect() {
        try {
            this.cf = b.connect("127.0.0.1", 8090).sync();
            System.out.println("遠程服務器已經連接, 可以進行數據交換..");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public ChannelFuture getChannelFuture() {

        if (this.cf == null) {
            this.connect();
        }
        if (!this.cf.channel().isActive()) {
            this.connect();
        }

        return this.cf;
    }

    public static void main(String[] args) throws InterruptedException {
        final NettyClientTest c = NettyClientTest.getInstance();
        ChannelFuture future = c.getChannelFuture();

        Request request = new Request();
        request.setId("1");
        request.setName("上杉繪梨衣");
        request.setInfo("04.24,和Sakura去東京天空樹,世界上最暖和的地方在天空樹的頂上。");
        future.channel().writeAndFlush(request).sync();

        Request request2 = new Request();
        request2.setId("2");
        request2.setName("上杉繪梨衣");
        request2.setInfo("04.26,和Sakura去明治神宮,有人在那里舉辦婚禮。");
        future.channel().writeAndFlush(request2);

        Request request3 = new Request();
        request3.setId("3");
        request3.setName("上杉繪梨衣");
        request3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。");
        future.channel().writeAndFlush(request3);

        Request request4 = new Request();
        request4.setId("4");
        request4.setName("上杉繪梨衣");
        request4.setInfo("Sakura最好了。");
        future.channel().writeAndFlush(request4);

        future.channel().closeFuture().sync();

    }
}

ClientHandler

package com.wk.test.nettyTest.jdk;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Response resp = (Response)msg;
            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    
}

JBoss Marshalling

這種序列化效率比JDK快三倍左右,這里暫不介紹。

protobuf

谷歌開源的一種二進制數據格式,是目前序列化最快的。

相較於json和xml來說,序列化后體積小,傳輸速率快。序列化后不可讀,必須反序列化才可讀。

使用

1.下載

下載地址:https://github.com/google/protobuf/releases

這里下載protoc-3.11.4-win64,windows系統使用的protoc.exe

2.編寫proto格式文件

我們需要編寫一個.proto格式的協議文件,通過該協議文件來生產java類,具體的語法和規則可以參考官方文檔。這里只舉個例子:

Request.proto

syntax = "proto3";

option java_package = "com.wk.test.nettyTest.proto";

option java_outer_classname = "Request";

message MessageRequest{
    uint64 id = 1;
    string name = 2;
    string info = 3;
}
syntax = "proto3";是使用的協議版本是3
java_package 是生成文件的包路徑
java_outer_classname 是類名
message MessageRequest{
    uint64 id = 1; string name = 2; string info = 3; }
消息體內容:
64 int類型的id
string 姓名和內容
后面的數字代表一個應答序號,同一級別下不可重復

3.生成協議文件對應的消息類

CMD命令到我們下載好的protoc.exe目錄下,執行命令

protoc.exe ./Request.proto --java_out=./

生成Requst.java

4.編寫代碼

准備工作已經結束了,我們將.proto文件和生成的java文件放入相對應的程序中就可以開始開發了

開發

pom.xml

        <!-- protobuf -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.11.4</version>
        </dependency>

這里注意要跟下載的protoc.exe版本一致

實體類

就是生成的java和proto文件

服務端

NettyServerTest

package com.wk.test.nettyTest.proto;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerTest {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                //設置日志
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        sc.pipeline().addLast(new ProtobufDecoder(Request.MessageRequest.getDefaultInstance()));

                        sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        sc.pipeline().addLast(new ProtobufEncoder());
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture cf = b.bind(8090).sync();

        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

ServerHandler

package com.wk.test.nettyTest.proto;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request.MessageRequest request = (Request.MessageRequest)msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
        ctx.writeAndFlush(request);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    
    
}

客戶端

NettyClientTest

package com.wk.test.nettyTest.proto;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientTest {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);

    private static class SingletonHolder {
        static final NettyClientTest instance = new NettyClientTest();
    }

    public static NettyClientTest getInstance() {
        return SingletonHolder.instance;
    }

    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf;

    private NettyClientTest() {
        group = new NioEventLoopGroup();
        b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        sc.pipeline().addLast(new ProtobufDecoder(Request.MessageRequest.getDefaultInstance()));

                        sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        sc.pipeline().addLast(new ProtobufEncoder());
                        //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通信,則會關閉響應的通道,主要為減小服務端資源占用)
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new ClientHandler());
                    }
                });
    }

    public void connect() {
        try {
            this.cf = b.connect("127.0.0.1", 8090).sync();
            System.out.println("遠程服務器已經連接, 可以進行數據交換..");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public ChannelFuture getChannelFuture() {

        if (this.cf == null) {
            this.connect();
        }
        if (!this.cf.channel().isActive()) {
            this.connect();
        }

        return this.cf;
    }

    public static void main(String[] args) throws InterruptedException {
        final NettyClientTest c = NettyClientTest.getInstance();
        ChannelFuture future = c.getChannelFuture();

        Request.MessageRequest.Builder builder =Request.MessageRequest.newBuilder();
        builder.setId(1);
        builder.setName("上杉繪梨衣");
        builder.setInfo("04.24,和Sakura去東京天空樹,世界上最暖和的地方在天空樹的頂上。");
        future.channel().writeAndFlush(builder.build()).sync();

        Request.MessageRequest.Builder builder2 =Request.MessageRequest.newBuilder();
        builder2.setId(2);
        builder2.setName("上杉繪梨衣");
        builder2.setInfo("04.26,和Sakura去明治神宮,有人在那里舉辦婚禮。");
        future.channel().writeAndFlush(builder2.build());

        Request.MessageRequest.Builder builder3 =Request.MessageRequest.newBuilder();
        builder3.setId(3);
        builder3.setName("上杉繪梨衣");
        builder3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。");
        future.channel().writeAndFlush(builder3.build());

        Request.MessageRequest.Builder builder4 =Request.MessageRequest.newBuilder();
        builder4.setId(4);
        builder4.setName("上杉繪梨衣");
        builder4.setInfo("Sakura最好了。");
        future.channel().writeAndFlush(builder4.build());

        future.channel().closeFuture().sync();

    }
}

ClientHandler

package com.wk.test.nettyTest.proto;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Request.MessageRequest request = (Request.MessageRequest)msg;
            System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    
}

優缺點

優點:protobuf是目前序列化最快的沒有之一,較json,xml傳輸體積小,速率高,適合高性能通訊的應用場景

缺點:如果修改消息內容,則需要重新生成java類。proto文件和java文件不對應則報錯。

Kryo(推薦使用)

kryo是基於proto的序列化框架,目前的dubbo中就是使用的它,速率僅次於protobuf,體積小,且不用通過proto文件生成java類。

pom.xml

<!-- kryo -->
        <dependency>
            <groupId>com.esotericsoftware</groupId>
            <artifactId>kryo</artifactId>
            <version>5.0.0-RC5</version>
        </dependency>

實體類 Request

package com.wk.test.nettyTest.kryo;

import java.io.Serializable;

public class Request implements Serializable {
    private String id;
    private String name;
    private String info;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}

封裝kryo

因為kryo是線程不安全的,因此我們要對kryo進行一層封裝

Serializer

序列化接口類

package com.wk.test.nettyTest.kryo;

public interface Serializer {
    //序列化接口
    byte[] serialize(Object object);
    //反序列化接口
    <T> T deserialize(byte[] bytes);
}

KryoSerializer

序列化實現類,通過ThreadLocal 使每個kryo都有一個線程副本,不會相互影響。

package com.wk.test.nettyTest.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.BeanSerializer;
import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

public class KryoSerializer implements Serializer {

    private final Class<?> clazz;

    public KryoSerializer(Class<?> clazz){
        this.clazz = clazz;
    }


    final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>(){
        @Override
        protected Kryo initialValue(){
            Kryo kryo = new Kryo();
            kryo.register(clazz, new BeanSerializer(kryo,clazz));
            return kryo;
        }
    };

    private Kryo getKryo(){
        return kryoThreadLocal.get();
    }

    @Override
    public byte[] serialize(Object object) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        try {
            Kryo kryo = getKryo();
            kryo.writeObjectOrNull(output,object,object.getClass());
            output.flush();
            return byteArrayOutputStream.toByteArray();
        }finally {
            IOUtils.closeQuietly(output);
            IOUtils.closeQuietly(byteArrayOutputStream);
        }

    }

    @Override
    public <T> T deserialize(byte[] bytes) {
        if(bytes ==null){
            return null;
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);
        try {
            Kryo kryo = getKryo();
            return (T) kryo.readObjectOrNull(input,clazz);
        }finally {
            IOUtils.closeQuietly(input);
            IOUtils.closeQuietly(byteArrayInputStream);
        }
    }
}

KryoSerializerFactory

工廠類,通過傳入class來獲取相對應的序列化工具類

package com.wk.test.nettyTest.kryo;

public class KryoSerializerFactory {
    public static Serializer getSerializer(Class<?> clazz){
        return new KryoSerializer(clazz);
    }
}

編碼、解碼類(也可以稱為序列化、反序列化類)

KryoMsgEncoder

package com.wk.test.nettyTest.kryo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class KryoMsgEncoder extends MessageToByteEncoder<Request> {

    private Serializer serializer = KryoSerializerFactory.getSerializer(Request.class);

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Request request, ByteBuf byteBuf) throws Exception {
        byte[] body = serializer.serialize(request);
        int headLength = body.length;
        //相當於消息頭
        byteBuf.writeInt(headLength);
        //相當於消息體
        byteBuf.writeBytes(body);
    }
}

KryoMsgDecoder

package com.wk.test.nettyTest.kryo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class KryoMsgDecoder extends ByteToMessageDecoder {

    private Serializer serializer = KryoSerializerFactory.getSerializer(Request.class);

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        //標記讀取的指針的位置
        byteBuf.markReaderIndex();
        //獲取消息頭,也就是長度
        int dataLength = byteBuf.readInt();
        if(dataLength <=0){
            //長度不對則當前消息有問題,關閉通道
            channelHandlerContext.close();
        }
        //長度小於真實長度則重新加載讀取指針
        if(byteBuf.readableBytes() < dataLength){
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] body = new byte[dataLength];
        byteBuf.readBytes(body);
        Request request = serializer.deserialize(body);
        list.add(request);
    }
}

服務端

NettyKryoServer

package com.wk.test.nettyTest.kryo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyKryoServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyKryoServer.class);

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                //設置日志
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new KryoMsgDecoder());
                        sc.pipeline().addLast(new KryoMsgEncoder());
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new KryoServerHandler());
                    }
                });

        ChannelFuture cf = b.bind(8090).sync();

        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

KryoServerHandler

package com.wk.test.nettyTest.kryo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class KryoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request)msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getInfo());

        ctx.writeAndFlush(request);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    
    
}

客戶端

NettyKryoClient

package com.wk.test.nettyTest.kryo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyKryoClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyKryoClient.class);

    private static class SingletonHolder {
        static final NettyKryoClient instance = new NettyKryoClient();
    }

    public static NettyKryoClient getInstance() {
        return SingletonHolder.instance;
    }

    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf;

    private NettyKryoClient() {
        group = new NioEventLoopGroup();
        b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new KryoMsgDecoder());
                        sc.pipeline().addLast(new KryoMsgEncoder());
                        //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通信,則會關閉響應的通道,主要為減小服務端資源占用)
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new KryoClientHandler());
                    }
                });
    }

    public void connect() {
        try {
            this.cf = b.connect("127.0.0.1", 8090).sync();
            System.out.println("遠程服務器已經連接, 可以進行數據交換..");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public ChannelFuture getChannelFuture() {

        if (this.cf == null) {
            this.connect();
        }
        if (!this.cf.channel().isActive()) {
            this.connect();
        }

        return this.cf;
    }

    public static void main(String[] args) throws InterruptedException {

        final NettyKryoClient c = NettyKryoClient.getInstance();
        ChannelFuture future = c.getChannelFuture();
        Request request = new Request();
        request.setId("1");
        request.setName("上杉繪梨衣");
        request.setInfo("04.24,和Sakura去東京天空樹,世界上最暖和的地方在天空樹的頂上。");
        future.channel().writeAndFlush(request).sync();

        Request request2 = new Request();
        request2.setId("2");
        request2.setName("上杉繪梨衣");
        request2.setInfo("04.26,和Sakura去明治神宮,有人在那里舉辦婚禮。");
        future.channel().writeAndFlush(request2);

        Request request3 = new Request();
        request3.setId("3");
        request3.setName("上杉繪梨衣");
        request3.setInfo("04.25,和Sakura去迪士尼,鬼屋很可怕,但是有Sakura在,所以不可怕。");
        future.channel().writeAndFlush(request3);

        Request request4 = new Request();
        request4.setId("4");
        request4.setName("上杉繪梨衣");
        request4.setInfo("Sakura最好了。");
        future.channel().writeAndFlush(request4);

        future.channel().closeFuture().sync();
    }
}

KryoClientHandler

package com.wk.test.nettyTest.kryo;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class KryoClientHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Request resp = (Request)msg;
            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getInfo());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM