Netty實現高性能RPC服務器


Netty實現高性能RPC服務器

 在本人寫的前一篇文章中,談及有關如何利用Netty開發實現,高性能RPC服務器的一些設計思路、設計原理,以及具體的實現方案(具體參見:談談如何使用Netty開發實現高性能的RPC服務器)。在文章的最后提及到,其實基於該方案設計的RPC服務器的處理性能,還有優化的余地。於是利用周末的時間,在原來NettyRPC框架的基礎上,加以優化重構,本次主要優化改造點如下:

  1、NettyRPC中對RPC消息進行編碼、解碼采用的是Netty自帶的ObjectEncoder、ObjectDecoder(對象編碼、解碼器),該編碼、解碼器基於的是Java的原生序列化機制,從已有的文章以及測試數據來看,Java的原生序列化性能效率不高,而且產生的序列化二進制碼流太大,故本次在優化中,引入RPC消息序列化協議的概念。所謂消息序列化協議,就是針對RPC消息的序列化、反序列化過程進行特殊的定制,引入第三方編解碼框架。本次引入的第三方編解碼框架有Kryo、Hessian。這里,不得不再次提及一下,對象序列化、反序列化的概念,在RPC的遠程服務調用過程中,需要把消息對象通過網絡傳輸,這個就要用到序列化將對象轉變成字節流,到達另外一端之后,再反序列化回來變成消息對象。

  2、引入Google Guava並發編程框架對NettyRPC的NIO線程池、業務線程池進行重新梳理封裝。

  3、利用第三方編解碼框架(Kryo、Hessian)的時候,考慮到高並發的場景下,頻繁的創建、銷毀序列化對象,會非常消耗JVM的內存資源,影響整個RPC服務器的處理性能,因此引入對象池化(Object Pooling)技術。眾所周知,創建新對象並初始化,可能會消耗很多的時間。當需要產生大量對象的時候,可能會對性能造成一定的影響。為了解決這個問題,除了提升硬件條件之外,對象池化技術就是這方面的銀彈,而Apache Commons Pool框架就是對象池化技術的一個很好的實現(開源項目路徑:http://commons.apache.org/proper/commons-pool/download_pool.cgi)。本文中的Hessian池化工作,主要是基於Apache Commons Pool框架,進行封裝處理。

  本文將着重,從上面的三個方面,對重構優化之后的NettyRPC服務器的實現思路、實現方式進行重點講解。首先請大家簡單看下,本次優化之后的NettyRPC服務器支持的序列化協議,如下圖所示:

  

  可以很清楚的看到,優化之后的NettyRPC可以支持Kryo、Hessian、Java本地序列化三種消息序列化方式。其中Java本地序列化方式,相信大家應該很熟悉了,再次不在重復講述。現在我們重點講述一下,另外兩種序列化方式:

  1、Kryo序列化。它是針對Java,而定制實現的高效對象序列化框架,相比Java本地原生序列化方式,Kryo在處理性能上、碼流大小上等等方面有很大的優化改進。目前已知的很多著名開源項目,都引入采用了該序列化方式。比如alibaba開源的dubbo RPC等等。本文中采用的Kryo的默認版本是基於:kryo-3.0.3。它的下載鏈接是:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3。為什么采用這個版本?主要原因我上面也說明了,出於應對高並發場景下,頻繁地創建、銷毀序列化對象,會非常消耗JVM的內存資源、以及時間。Kryo的這個發行版本中,集成引入了序列化對象池功能模塊(KryoFactory、KryoPool),這樣我們就不必再利用Apache Commons Pool對其進行二次封裝。

  2、Hessian序列化。Hessian本身是一種序列化協議,它比Java原生的序列化、反序列化速度更快、序列化出來的數據也更小。它是采用二進制格式進行數據傳輸,而且,目前支持多種語言格式。本文中采用的是:hessian-4.0.37 版本,它的下載鏈接是:http://hessian.caucho.com/#Java

  接下來,先來看下優化之后的NettyRPC的消息協議編解碼包(newlandframework.netty.rpc.serialize.support、newlandframework.netty.rpc.serialize.support.kryo、newlandframework.netty.rpc.serialize.support.hessian)的結構,如下圖所示:

     

  其中RPC請求消息結構代碼如下:

復制代碼
/**
 * @filename:MessageRequest.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務請求結構
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.io.Serializable;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class MessageRequest implements Serializable {

    private String messageId;
    private String className;
    private String methodName;
    private Class<?>[] typeParameters;
    private Object[] parametersVal;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getTypeParameters() {
        return typeParameters;
    }

    public void setTypeParameters(Class<?>[] typeParameters) {
        this.typeParameters = typeParameters;
    }

    public Object[] getParameters() {
        return parametersVal;
    }

    public void setParameters(Object[] parametersVal) {
        this.parametersVal = parametersVal;
    }

    public String toString() {
        return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"});
    }
}
復制代碼

  RPC應答消息結構,如下所示:

復制代碼
/**
 * @filename:MessageResponse.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務應答結構
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.io.Serializable;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class MessageResponse implements Serializable {

    private String messageId;
    private String error;
    private Object resultDesc;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public Object getResult() {
        return resultDesc;
    }

    public void setResult(Object resultDesc) {
        this.resultDesc = resultDesc;
    }

    public String toString() {
        return ReflectionToStringBuilder.toString(this);
    }
}
復制代碼

  現在,我們就來對上述的RPC請求消息、應答消息進行編解碼框架的設計。由於NettyRPC中的協議類型,目前已經支持Kryo序列化、Hessian序列化、Java原生本地序列化方式。考慮到可擴展性,故要抽象出RPC消息序列化,協議類型對象(RpcSerializeProtocol),它的代碼實現如下所示:

復制代碼
/**
 * @filename:RpcSerializeProtocol.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息序序列化協議類型
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public enum RpcSerializeProtocol {

    //目前由於沒有引入跨語言RPC通信機制,暫時采用支持同構語言Java序列化/反序列化機制的第三方插件
    //NettyRPC目前已知的序列化插件有:Java原生序列化、Kryo、Hessian
    JDKSERIALIZE("jdknative"), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian");

    private String serializeProtocol;

    private RpcSerializeProtocol(String serializeProtocol) {
        this.serializeProtocol = serializeProtocol;
    }

    public String toString() {
        ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
        return ReflectionToStringBuilder.toString(this);
    }

    public String getProtocol() {
        return serializeProtocol;
    }
}
復制代碼

  針對不同編解碼序列化的框架(這里主要是指Kryo、Hessian),再抽象、萃取出一個RPC消息序列化/反序列化接口(RpcSerialize)、RPC消息編解碼接口(MessageCodecUtil)。

復制代碼
/**
 * @filename:RpcSerialize.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息序列化/反序列化接口定義
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public interface RpcSerialize {

    void serialize(OutputStream output, Object object) throws IOException;

    Object deserialize(InputStream input) throws IOException;
}
復制代碼
復制代碼
/**
 * @filename:MessageCodecUtil.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息編解碼接口
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.buffer.ByteBuf;
import java.io.IOException;

public interface MessageCodecUtil {

    //RPC消息報文頭長度4個字節
    final public static int MESSAGE_LENGTH = 4;

    public void encode(final ByteBuf out, final Object message) throws IOException;

    public Object decode(byte[] body) throws IOException;
}
復制代碼

  最后我們的NettyRPC框架要能自由地支配、定制Netty的RPC服務端、客戶端,采用何種序列化來進行RPC消息對象的網絡傳輸。因此,要再抽象一個RPC消息序列化協議選擇器接口(RpcSerializeFrame),對應的實現如下:

復制代碼
/**
 * @filename:RpcSerializeFrame.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息序序列化協議選擇器接口
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.channel.ChannelPipeline;

public interface RpcSerializeFrame {

    public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline);
}
復制代碼

  現在有了上面定義的一系列的接口,現在就可以定制實現,基於Kryo、Hessian方式的RPC消息序列化、反序列化模塊了。先來看下整體的類圖結構:

  首先是RPC消息的編碼器MessageEncoder,它繼承自Netty的MessageToByteEncoder編碼器。主要是把RPC消息對象編碼成二進制流的格式,對應實現如下:

復制代碼
/**
 * @filename:MessageEncoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息編碼接口
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MessageEncoder extends MessageToByteEncoder<Object> {

    private MessageCodecUtil util = null;

    public MessageEncoder(final MessageCodecUtil util) {
        this.util = util;
    }

    protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
        util.encode(out, msg);
    }
}
復制代碼

  接下來是RPC消息的解碼器MessageDecoder,它繼承自Netty的ByteToMessageDecoder。主要針對二進制流反序列化成消息對象。當然了,在之前的一篇文章中我曾經提到,NettyRPC是基於TCP協議的,TCP在傳輸數據的過程中會出現所謂的“粘包”現象,所以我們的MessageDecoder要對RPC消息體的長度進行校驗,如果不滿足RPC消息報文頭中指定的消息體長度,我們直接重置一下ByteBuf讀索引的位置,具體可以參考如下的代碼方式,進行RPC消息協議的解析:

復制代碼
/**
 * @filename:MessageDecoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息解碼接口
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.IOException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageDecoder extends ByteToMessageDecoder {

    final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH;
    private MessageCodecUtil util = null;

    public MessageDecoder(final MessageCodecUtil util) {
        this.util = util;
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        //出現粘包導致消息頭長度不對,直接返回
        if (in.readableBytes() < MessageDecoder.MESSAGE_LENGTH) {
            return;
        }

        in.markReaderIndex();
        //讀取消息的內容長度
        int messageLength = in.readInt();
        
        if (messageLength < 0) {
            ctx.close();
        }

        //讀到的消息長度和報文頭的已知長度不匹配。那就重置一下ByteBuf讀索引的位置
        if (in.readableBytes() < messageLength) {
            in.resetReaderIndex();
            return;
        } else {
            byte[] messageBody = new byte[messageLength];
            in.readBytes(messageBody);

            try {
                Object obj = util.decode(messageBody);
                out.add(obj);
            } catch (IOException ex) {
                Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}
復制代碼

  現在,我們進一步實現,利用Kryo序列化方式,對RPC消息進行編解碼的模塊。首先是要實現NettyRPC消息序列化接口(RpcSerialize)的方法。

復制代碼
/**
 * @filename:KryoSerialize.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo序列化/反序列化實現
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import newlandframework.netty.rpc.serialize.support.RpcSerialize;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.pool.KryoPool;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class KryoSerialize implements RpcSerialize {

    private KryoPool pool = null;

    public KryoSerialize(final KryoPool pool) {
        this.pool = pool;
    }

    public void serialize(OutputStream output, Object object) throws IOException {
        Kryo kryo = pool.borrow();
        Output out = new Output(output);
        kryo.writeClassAndObject(out, object);
        out.close();
        pool.release(kryo);
    }

    public Object deserialize(InputStream input) throws IOException {
        Kryo kryo = pool.borrow();
        Input in = new Input(input);
        Object result = kryo.readClassAndObject(in);
        in.close();
        pool.release(kryo);
        return result;
    }
}
復制代碼

   接着利用Kryo庫里面的對象池,對RPC消息對象進行編解碼。首先是Kryo對象池工廠(KryoPoolFactory),這個也是我為什么選擇kryo-3.0.3版本的原因了。代碼如下:

復制代碼
/**
 * @filename:KryoPoolFactory.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo對象池工廠
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.objenesis.strategy.StdInstantiatorStrategy;

public class KryoPoolFactory {

    private static KryoPoolFactory poolFactory = null;

    private KryoFactory factory = new KryoFactory() {
        public Kryo create() {
            Kryo kryo = new Kryo();
            kryo.setReferences(false);
            //把已知的結構注冊到Kryo注冊器里面,提高序列化/反序列化效率
            kryo.register(MessageRequest.class);
            kryo.register(MessageResponse.class);
            kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
            return kryo;
        }
    };

    private KryoPool pool = new KryoPool.Builder(factory).build();

    private KryoPoolFactory() {
    }

    public static KryoPool getKryoPoolInstance() {
        if (poolFactory == null) {
            synchronized (KryoPoolFactory.class) {
                if (poolFactory == null) {
                    poolFactory = new KryoPoolFactory();
                }
            }
        }
        return poolFactory.getPool();
    }

    public KryoPool getPool() {
        return pool;
    }
}
復制代碼

  Kryo對RPC消息進行編碼、解碼的工具類KryoCodecUtil,實現了RPC消息編解碼接口(MessageCodecUtil),具體實現方式如下:

復制代碼
/**
 * @filename:KryoCodecUtil.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo編解碼工具類
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import com.esotericsoftware.kryo.pool.KryoPool;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import com.google.common.io.Closer;

public class KryoCodecUtil implements MessageCodecUtil {

    private KryoPool pool;
    private static Closer closer = Closer.create();

    public KryoCodecUtil(KryoPool pool) {
        this.pool = pool;
    }

    public void encode(final ByteBuf out, final Object message) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            closer.register(byteArrayOutputStream);
            KryoSerialize kryoSerialization = new KryoSerialize(pool);
            kryoSerialization.serialize(byteArrayOutputStream, message);
            byte[] body = byteArrayOutputStream.toByteArray();
            int dataLength = body.length;
            out.writeInt(dataLength);
            out.writeBytes(body);
        } finally {
            closer.close();
        }
    }

    public Object decode(byte[] body) throws IOException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
            closer.register(byteArrayInputStream);
            KryoSerialize kryoSerialization = new KryoSerialize(pool);
            Object obj = kryoSerialization.deserialize(byteArrayInputStream);
            return obj;
        } finally {
            closer.close();
        }
    }
}
復制代碼

  最后是,Kryo自己的編碼器、解碼器,其實只要調用Kryo編解碼工具類(KryoCodecUtil)里面的encode、decode方法就可以了。現在貼出具體的代碼:

復制代碼
/**
 * @filename:KryoDecoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo解碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageDecoder;

public class KryoDecoder extends MessageDecoder {

    public KryoDecoder(MessageCodecUtil util) {
        super(util);
    }
}
復制代碼
復制代碼
/**
 * @filename:KryoEncoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo編碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageEncoder;

public class KryoEncoder extends MessageEncoder {

    public KryoEncoder(MessageCodecUtil util) {
        super(util);
    }
}
復制代碼

  最后,我們再來實現一下,利用Hessian實現RPC消息的編碼、解碼器代碼模塊。首先還是Hessian序列化/反序列化實現(HessianSerialize),它同樣實現了RPC消息序列化/反序列化接口(RpcSerialize),對應的代碼如下:

復制代碼
/**
 * @filename:HessianSerialize.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian序列化/反序列化實現
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import newlandframework.netty.rpc.serialize.support.RpcSerialize;

public class HessianSerialize implements RpcSerialize {

    public void serialize(OutputStream output, Object object) {
        Hessian2Output ho = new Hessian2Output(output);
        try {
            ho.startMessage();
            ho.writeObject(object);
            ho.completeMessage();
            ho.close();
            output.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Object deserialize(InputStream input) {
        Object result = null;
        try {
            Hessian2Input hi = new Hessian2Input(input);
            hi.startMessage();
            result = hi.readObject();
            hi.completeMessage();
            hi.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }
}
復制代碼

  現在利用對象池(Object Pooling)技術,對Hessian序列化/反序列化類(HessianSerialize)進行池化處理,對應的代碼如下:

復制代碼
/**
 * @filename:HessianSerializeFactory.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian序列化/反序列化對象工廠池
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> {

    public HessianSerialize create() throws Exception {
        return createHessian();
    }

    public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) {
        return new DefaultPooledObject<HessianSerialize>(hessian);
    }

    private HessianSerialize createHessian() {
        return new HessianSerialize();
    }
}
復制代碼
復制代碼
/**
 * @filename:HessianSerializePool.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian序列化/反序列化池
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class HessianSerializePool {

    //Netty采用Hessian序列化/反序列化的時候,為了避免重復產生對象,提高JVM內存利用率,故引入對象池技術,經過測試
    //遇到高並發序列化/反序列化的場景的時候,序列化效率明顯提升不少。
    private GenericObjectPool<HessianSerialize> hessianPool;
    private static HessianSerializePool poolFactory = null;

    private HessianSerializePool() {
        hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory());
    }

    public static HessianSerializePool getHessianPoolInstance() {
        if (poolFactory == null) {
            synchronized (HessianSerializePool.class) {
                if (poolFactory == null) {
                    poolFactory = new HessianSerializePool();
                }
            }
        }
        return poolFactory;
    }

    //預留接口,后續可以通過Spring Property Placeholder依賴注入
    public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
        hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory());
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        //最大池對象總數
        config.setMaxTotal(maxTotal);
        //最小空閑數
        config.setMinIdle(minIdle);
        //最大等待時間, 默認的值為-1,表示無限等待
        config.setMaxWaitMillis(maxWaitMillis);
        //退出連接的最小空閑時間 默認1800000毫秒
        config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        hessianPool.setConfig(config);
    }

    public HessianSerialize borrow() {
        try {
            return getHessianPool().borrowObject();
        } catch (final Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

    public void restore(final HessianSerialize object) {
        getHessianPool().returnObject(object);
    }

    public GenericObjectPool<HessianSerialize> getHessianPool() {
        return hessianPool;
    }
}
復制代碼

  Hessian序列化對象經過池化處理之后,我們通過Hessian編解碼工具類,來“借用”Hessian序列化對象(HessianSerialize),當然了,你借出來之后,一定要還回去嘛。Hessian編解碼工具類的實現方式如下:

復制代碼
/**
 * @filename:HessianCodecUtil.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian編解碼工具類
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import com.google.common.io.Closer;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;

public class HessianCodecUtil implements MessageCodecUtil {

    HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance();
    private static Closer closer = Closer.create();

    public HessianCodecUtil() {

    }

    public void encode(final ByteBuf out, final Object message) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            closer.register(byteArrayOutputStream);
            HessianSerialize hessianSerialization = pool.borrow();
            hessianSerialization.serialize(byteArrayOutputStream, message);
            byte[] body = byteArrayOutputStream.toByteArray();
            int dataLength = body.length;
            out.writeInt(dataLength);
            out.writeBytes(body);
            pool.restore(hessianSerialization);
        } finally {
            closer.close();
        }
    }

    public Object decode(byte[] body) throws IOException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
            closer.register(byteArrayInputStream);
            HessianSerialize hessianSerialization = pool.borrow();
            Object object = hessianSerialization.deserialize(byteArrayInputStream);
            pool.restore(hessianSerialization);
            return object;
        } finally {
            closer.close();
        }
    }
}
復制代碼

  最后Hessian對RPC消息的編碼器、解碼器參考實現代碼如下所示:

復制代碼
/**
 * @filename:HessianDecoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian解碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageDecoder;

public class HessianDecoder extends MessageDecoder {

    public HessianDecoder(MessageCodecUtil util) {
        super(util);
    }
}
復制代碼
復制代碼
/**
 * @filename:HessianEncoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian編碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageEncoder;

public class HessianEncoder extends MessageEncoder {

    public HessianEncoder(MessageCodecUtil util) {
        super(util);
    }
}
復制代碼

  到目前為止,NettyRPC所針對的Kryo、Hessian序列化協議模塊,已經設計實現完畢,現在我們就要把這個協議,嵌入NettyRPC的核心模塊包(newlandframework.netty.rpc.core),下面只給出優化調整之后的代碼,其它代碼模塊的內容,可以參考我上一篇的文章:談談如何使用Netty開發實現高性能的RPC服務器。好了,我們先來看下,NettyRPC核心模塊包(newlandframework.netty.rpc.core)的層次結構:

     

  先來看下,NettyRPC服務端的實現部分。首先是,Rpc服務端管道初始化(MessageRecvChannelInitializer),跟上一版本對比,主要引入了序列化消息對象(RpcSerializeProtocol),具體實現代碼如下:

復制代碼
/**
 * @filename:MessageRecvChannelInitializer.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服務端管道初始化
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import java.util.Map;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {

    private RpcSerializeProtocol protocol;
    private RpcRecvSerializeFrame frame = null;

    MessageRecvChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
        this.protocol = protocol;
        return this;
    }

    MessageRecvChannelInitializer(Map<String, Object> handlerMap) {
        frame = new RpcRecvSerializeFrame(handlerMap);
    }

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        frame.select(protocol, pipeline);
    }
}
復制代碼

  Rpc服務器執行模塊(MessageRecvExecutor)中,默認的序列化采用Java原生本地序列化機制,並且優化了線程池異步調用的層次結構。具體代碼如下:

復制代碼
/**
 * @filename:MessageRecvExecutor.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服務器執行模塊
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import newlandframework.netty.rpc.model.MessageKeyVal;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {

    private String serverAddress;
    //默認JKD本地序列化協議
    private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;

    private final static String DELIMITER = ":";

    private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();

    private static ListeningExecutorService threadPoolExecutor;

    public MessageRecvExecutor(String serverAddress, String serializeProtocol) {
        this.serverAddress = serverAddress;
        this.serializeProtocol = Enum.valueOf(RpcSerializeProtocol.class, serializeProtocol);
    }

    public static void submit(Callable<Boolean> task, ChannelHandlerContext ctx, MessageRequest request, MessageResponse response) {
        if (threadPoolExecutor == null) {
            synchronized (MessageRecvExecutor.class) {
                if (threadPoolExecutor == null) {
                    threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
                }
            }
        }

        ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(task);
        //Netty服務端把計算結果異步返回
        Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
            public void onSuccess(Boolean result) {
                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        System.out.println("RPC Server Send message-id respone:" + request.getMessageId());
                    }
                });
            }

            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        }, threadPoolExecutor);
    }

    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        try {
            MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));
            Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();

            Set s = rpcServiceObject.entrySet();
            Iterator<Map.Entry<String, Object>> it = s.iterator();
            Map.Entry<String, Object> entry;

            while (it.hasNext()) {
                entry = it.next();
                handlerMap.put(entry.getKey(), entry.getValue());
            }
        } catch (ClassNotFoundException ex) {
            java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void afterPropertiesSet() throws Exception {
        //netty的線程池模型設置成主從線程池模式,這樣可以應對高並發請求
        //當然netty還支持單線程、多線程網絡IO模型,可以根據業務需求靈活配置
        ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");

        //方法返回到Java虛擬機的可用的處理器數量
        int parallel = Runtime.getRuntime().availableProcessors() * 2;

        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup(parallel, threadRpcFactory, SelectorProvider.provider());

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new MessageRecvChannelInitializer(handlerMap).buildRpcSerializeProtocol(serializeProtocol))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);

            if (ipAddr.length == 2) {
                String host = ipAddr[0];
                int port = Integer.parseInt(ipAddr[1]);
                ChannelFuture future = bootstrap.bind(host, port).sync();
                System.out.printf("[author tangjie] Netty RPC Server start success!\nip:%s\nport:%d\nprotocol:%s\n\n", host, port, serializeProtocol);
                future.channel().closeFuture().sync();
            } else {
                System.out.printf("[author tangjie] Netty RPC Server start fail!\n");
            }
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}
復制代碼

  Rpc服務器消息處理(MessageRecvHandler)也跟隨着調整:

復制代碼
/**
 * @filename:MessageRecvHandler.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服務器消息處理
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Map;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;

public class MessageRecvHandler extends ChannelInboundHandlerAdapter {

    private final Map<String, Object> handlerMap;

    public MessageRecvHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MessageRequest request = (MessageRequest) msg;
        MessageResponse response = new MessageResponse();
        MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap);
        //不要阻塞nio線程,復雜的業務邏輯丟給專門的線程池
        MessageRecvExecutor.submit(recvTask, ctx, request, response);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //網絡有異常要關閉通道
        ctx.close();
    }
}
復制代碼

  Rpc服務器消息線程任務處理(MessageRecvInitializeTask)完成的任務也更加單純,即根據RPC消息的請求報文,利用反射得到最終的計算結果,並把結果寫入RPC應答報文結構。代碼如下:

復制代碼
/**
 * @filename:MessageRecvInitializeTask.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服務器消息線程任務處理
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.concurrent.Callable;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.apache.commons.lang.reflect.MethodUtils;

public class MessageRecvInitializeTask implements Callable<Boolean> {

    private MessageRequest request = null;
    private MessageResponse response = null;
    private Map<String, Object> handlerMap = null;
    private ChannelHandlerContext ctx = null;

    public MessageResponse getResponse() {
        return response;
    }

    public MessageRequest getRequest() {
        return request;
    }

    public void setRequest(MessageRequest request) {
        this.request = request;
    }

    MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap) {
        this.request = request;
        this.response = response;
        this.handlerMap = handlerMap;
        this.ctx = ctx;
    }

    public Boolean call() {
        response.setMessageId(request.getMessageId());
        try {
            Object result = reflect(request);
            response.setResult(result);
            return Boolean.TRUE;
        } catch (Throwable t) {
            response.setError(t.toString());
            t.printStackTrace();
            System.err.printf("RPC Server invoke error!\n");
            return Boolean.FALSE;
        }
    }

    private Object reflect(MessageRequest request) throws Throwable {
        String className = request.getClassName();
        Object serviceBean = handlerMap.get(className);
        String methodName = request.getMethodName();
        Object[] parameters = request.getParameters();
        return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
    }
}
復制代碼

  剛才說到了,NettyRPC的服務端,可以選擇具體的序列化協議,目前是通過硬編碼方式實現。后續可以考慮,通過Spring IOC方式,依賴注入。其對應代碼如下:

復制代碼
/**
 * @filename:RpcRecvSerializeFrame.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC服務端消息序列化協議框架
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.Map;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.RpcSerializeFrame;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
import newlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil;
import newlandframework.netty.rpc.serialize.support.hessian.HessianDecoder;
import newlandframework.netty.rpc.serialize.support.hessian.HessianEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil;
import newlandframework.netty.rpc.serialize.support.kryo.KryoDecoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory;

public class RpcRecvSerializeFrame implements RpcSerializeFrame {

    private Map<String, Object> handlerMap = null;

    public RpcRecvSerializeFrame(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    //后續可以優化成通過spring ioc方式注入
    public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
        switch (protocol) {
            case JDKSERIALIZE: {
                pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
                pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
                pipeline.addLast(new ObjectEncoder());
                pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                pipeline.addLast(new MessageRecvHandler(handlerMap));
                break;
            }
            case KRYOSERIALIZE: {
                KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
                pipeline.addLast(new KryoEncoder(util));
                pipeline.addLast(new KryoDecoder(util));
                pipeline.addLast(new MessageRecvHandler(handlerMap));
                break;
            }
            case HESSIANSERIALIZE: {
                HessianCodecUtil util = new HessianCodecUtil();
                pipeline.addLast(new HessianEncoder(util));
                pipeline.addLast(new HessianDecoder(util));
                pipeline.addLast(new MessageRecvHandler(handlerMap));
                break;
            }
        }
    }
}
復制代碼

  到目前為止,NettyRPC的服務端的設計實現,已經告一段落。

  現在繼續實現一下NettyRPC的客戶端模塊。其中,Rpc客戶端管道初始化(MessageSendChannelInitializer)模塊調整之后,同樣也支持選擇具體的消息序列化協議(RpcSerializeProtocol)。代碼如下:

復制代碼
/**
 * @filename:MessageSendChannelInitializer.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端管道初始化
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {

    private RpcSerializeProtocol protocol;
    private RpcSendSerializeFrame frame = new RpcSendSerializeFrame();

    MessageSendChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
        this.protocol = protocol;
        return this;
    }
    
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        frame.select(protocol, pipeline);
    }
}
復制代碼

  Rpc客戶端執行模塊(MessageSendExecutor)代碼實現如下:

復制代碼
/**
 * @filename:MessageSendExecutor.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端執行模塊
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import com.google.common.reflect.Reflection;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class MessageSendExecutor {

    private RpcServerLoader loader = RpcServerLoader.getInstance();

    public MessageSendExecutor() {
    }

    public MessageSendExecutor(String serverAddress, RpcSerializeProtocol serializeProtocol) {
        loader.load(serverAddress, serializeProtocol);
    }

    public void setRpcServerLoader(String serverAddress, RpcSerializeProtocol serializeProtocol) {
        loader.load(serverAddress, serializeProtocol);
    }

    public void stop() {
        loader.unLoad();
    }

    public static <T> T execute(Class<T> rpcInterface) {
        return (T) Reflection.newProxy(rpcInterface, new MessageSendProxy<T>());
    }
}
復制代碼

  Rpc客戶端線程任務處理(MessageSendInitializeTask),其中參數增加了協議類型(RpcSerializeProtocol),具體代碼如下:

復制代碼
/**
 * @filename:MessageSendInitializeTask.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端線程任務處理
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class MessageSendInitializeTask implements Callable<Boolean> {

    private EventLoopGroup eventLoopGroup = null;
    private InetSocketAddress serverAddress = null;
    private RpcSerializeProtocol protocol;

    MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcSerializeProtocol protocol) {
        this.eventLoopGroup = eventLoopGroup;
        this.serverAddress = serverAddress;
        this.protocol = protocol;
    }

    public Boolean call() {
        Bootstrap b = new Bootstrap();
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
        b.handler(new MessageSendChannelInitializer().buildRpcSerializeProtocol(protocol));

        ChannelFuture channelFuture = b.connect(serverAddress);
        channelFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(final ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
                    RpcServerLoader.getInstance().setMessageSendHandler(handler);
                }
            }
        });
        return Boolean.TRUE;
    }
}
復制代碼

  Rpc客戶端消息處理(MessageSendProxy)的實現方式調整重構之后,如下所示:

復制代碼
/**
 * @filename:MessageSendProxy.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端消息處理
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import java.lang.reflect.Method;
import java.util.UUID;
import newlandframework.netty.rpc.model.MessageRequest;
import com.google.common.reflect.AbstractInvocationHandler;

public class MessageSendProxy<T> extends AbstractInvocationHandler {

    public Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
        MessageRequest request = new MessageRequest();
        request.setMessageId(UUID.randomUUID().toString());
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setTypeParameters(method.getParameterTypes());
        request.setParameters(args);

        MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
        MessageCallBack callBack = handler.sendRequest(request);
        return callBack.start();
    }
}
復制代碼

  同樣,NettyRPC的客戶端也是可以選擇協議類型的,必須注意的是,NettyRPC的客戶端和服務端的協議類型必須一致,才能互相通信。NettyRPC的客戶端消息序列化協議框架代碼實現方式如下:

復制代碼
/**
 * @filename:RpcSendSerializeFrame.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC客戶端消息序列化協議框架
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil;
import newlandframework.netty.rpc.serialize.support.hessian.HessianDecoder;
import newlandframework.netty.rpc.serialize.support.hessian.HessianEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil;
import newlandframework.netty.rpc.serialize.support.kryo.KryoDecoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoEncoder;
import newlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory;
import newlandframework.netty.rpc.serialize.support.RpcSerializeFrame;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class RpcSendSerializeFrame implements RpcSerializeFrame {

    //后續可以優化成通過spring ioc方式注入
    public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
        switch (protocol) {
            case JDKSERIALIZE: {
                pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
                pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
                pipeline.addLast(new ObjectEncoder());
                pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                pipeline.addLast(new MessageSendHandler());
                break;
            }
            case KRYOSERIALIZE: {
                KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
                pipeline.addLast(new KryoEncoder(util));
                pipeline.addLast(new KryoDecoder(util));
                pipeline.addLast(new MessageSendHandler());
                break;
            }
            case HESSIANSERIALIZE: {
                HessianCodecUtil util = new HessianCodecUtil();
                pipeline.addLast(new HessianEncoder(util));
                pipeline.addLast(new HessianDecoder(util));
                pipeline.addLast(new MessageSendHandler());
                break;
            }
        }
    }
}
復制代碼

  最后,NettyRPC客戶端,要加載NettyRPC服務端的一些上下文(Context)信息。因此,RPC服務器配置加載(RpcServerLoader)的代碼重構調整如下:

復制代碼
/**
 * @filename:RpcServerLoader.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務器配置加載
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

public class RpcServerLoader {

    private volatile static RpcServerLoader rpcServerLoader;
    private final static String DELIMITER = ":";
    //默認采用Java原生序列化協議方式傳輸RPC消息
    private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;

    //方法返回到Java虛擬機的可用的處理器數量
    private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
    //netty nio線程池
    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
    private static ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
    private MessageSendHandler messageSendHandler = null;

    //等待Netty服務端鏈路建立通知信號
    private Lock lock = new ReentrantLock();
    private Condition connectStatus = lock.newCondition();
    private Condition handlerStatus = lock.newCondition();

    private RpcServerLoader() {
    }

    //並發雙重鎖定
    public static RpcServerLoader getInstance() {
        if (rpcServerLoader == null) {
            synchronized (RpcServerLoader.class) {
                if (rpcServerLoader == null) {
                    rpcServerLoader = new RpcServerLoader();
                }
            }
        }
        return rpcServerLoader;
    }

    public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {
        String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
        if (ipAddr.length == 2) {
            String host = ipAddr[0];
            int port = Integer.parseInt(ipAddr[1]);
            final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);

            ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, serializeProtocol));

            //監聽線程池異步的執行結果成功與否再決定是否喚醒全部的客戶端RPC線程
            Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
                public void onSuccess(Boolean result) {
                    try {
                        lock.lock();

                        if (messageSendHandler == null) {
                            handlerStatus.await();
                        }

                        //Futures異步回調,喚醒所有rpc等待線程
                        if (result == Boolean.TRUE && messageSendHandler != null) {
                            connectStatus.signalAll();
                        }
                    } catch (InterruptedException ex) {
                        Logger.getLogger(RpcServerLoader.class.getName()).log(Level.SEVERE, null, ex);
                    } finally {
                        lock.unlock();
                    }
                }

                public void onFailure(Throwable t) {
                    t.printStackTrace();
                }
            }, threadPoolExecutor);
        }
    }

    public void setMessageSendHandler(MessageSendHandler messageInHandler) {
        try {
            lock.lock();
            this.messageSendHandler = messageInHandler;
            handlerStatus.signal();
        } finally {
            lock.unlock();
        }
    }

    public MessageSendHandler getMessageSendHandler() throws InterruptedException {
        try {
            lock.lock();
            //Netty服務端鏈路沒有建立完畢之前,先掛起等待
            if (messageSendHandler == null) {
                connectStatus.await();
            }
            return messageSendHandler;
        } finally {
            lock.unlock();
        }
    }

    public void unLoad() {
        messageSendHandler.close();
        threadPoolExecutor.shutdown();
        eventLoopGroup.shutdownGracefully();
    }

    public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {
        this.serializeProtocol = serializeProtocol;
    }
}
復制代碼

到目前為止,NettyRPC的主要核心模塊的代碼,全部呈現出來了。到底經過改良重構之后,NettyRPC服務器的性能如何?還是那句話,實踐是檢驗真理的唯一標准。現在,我們就來啟動三台NettyRPC服務器進行驗證。具體服務端的配置參數,參考如下:

1、Java原生本地序列化NettyRPC服務器,對應IP為:127.0.0.1:18887。

2、Kryo序列化NettyRPC服務器,對應IP為:127.0.0.1:18888。

3、Hessian序列化NettyRPC服務器,對應IP為:127.0.0.1:18889。

具體的Spring配置文件結構如下所示:

 

參數配置的內容如下:

rpc-server-jdknative.properties

#rpc server's ip address config
rpc.server.addr=127.0.0.1:18887

rpc-server-kryo.properties

#rpc server's ip address config
rpc.server.addr=127.0.0.1:18888

rpc-server-hessian.properties

#rpc server's ip address config
rpc.server.addr=127.0.0.1:18889

rpc-invoke-config-jdknative.xml

復制代碼
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
  <context:component-scan base-package="newlandframework.netty.rpc.core"/>
  <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server-jdknative.properties"/>
  <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal">
    <property name="messageKeyVal">
      <map>
        <entry key="newlandframework.netty.rpc.servicebean.Calculate">
          <ref bean="calc"/>
        </entry>
      </map>
    </property>
  </bean>
  <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/>
  <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor">
    <constructor-arg name="serverAddress" value="${rpc.server.addr}"/>
    <constructor-arg name="serializeProtocol" value="JDKSERIALIZE"/>
  </bean>
</beans>
復制代碼

rpc-invoke-config-kryo.xml

復制代碼
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
  <context:component-scan base-package="newlandframework.netty.rpc.core"/>
  <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server-kryo.properties"/>
  <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal">
    <property name="messageKeyVal">
      <map>
        <entry key="newlandframework.netty.rpc.servicebean.Calculate">
          <ref bean="calc"/>
        </entry>
      </map>
    </property>
  </bean>
  <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/>
  <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor">
    <constructor-arg name="serverAddress" value="${rpc.server.addr}"/>
    <constructor-arg name="serializeProtocol" value="KRYOSERIALIZE"/>
  </bean>
</beans>
復制代碼

rpc-invoke-config-hessian.xml

復制代碼
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
  <context:component-scan base-package="newlandframework.netty.rpc.core"/>
  <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server-hessian.properties"/>
  <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal">
    <property name="messageKeyVal">
      <map>
        <entry key="newlandframework.netty.rpc.servicebean.Calculate">
          <ref bean="calc"/>
        </entry>
      </map>
    </property>
  </bean>
  <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/>
  <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor">
    <constructor-arg name="serverAddress" value="${rpc.server.addr}"/>
    <constructor-arg name="serializeProtocol" value="HESSIANSERIALIZE"/>
  </bean>
</beans>
復制代碼

  然后,對應的NettRPC服務器啟動方式參考如下:

new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-jdknative.xml");
new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-kryo.xml");
new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-hessian.xml");

  如果一切順利的話,在控制台上,會打印出支持Java原生序列化、Kryo序列化、Hessian序列化的NettyRPC服務器的啟動信息,具體截圖如下:

  首先是Java原生序列化NettyRPC啟動成功截圖:

     

  然后是Kryo序列化NettyRPC啟動成功截圖:

     

  最后是Hessian序列化NettyRPC啟動成功截圖:

     

  現在,還是跟我上一篇文章用到的並發測試用例一樣,設計構造一個,瞬時值並行度1W的求和計算RPC請求,總共請求10筆,然后觀察每一筆具體協議(Java原生序列化、Kryo、Hessian)的RPC消息編碼、解碼消耗時長(毫秒)。

  測試代碼如下所示:

復制代碼
/**
 * @filename:RpcParallelTest.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc並發測試代碼
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.servicebean;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import newlandframework.netty.rpc.core.MessageSendExecutor;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
import org.apache.commons.lang.time.StopWatch;

public class RpcParallelTest {

    public static void parallelTask(MessageSendExecutor executor, int parallel, String serverAddress, RpcSerializeProtocol protocol) throws InterruptedException {
        //開始計時
        StopWatch sw = new StopWatch();
        sw.start();

        CountDownLatch signal = new CountDownLatch(1);
        CountDownLatch finish = new CountDownLatch(parallel);

        for (int index = 0; index < parallel; index++) {
            CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index);
            new Thread(client).start();
        }

        //10000個並發線程瞬間發起請求操作
        signal.countDown();
        finish.await();
        sw.stop();

        String tip = String.format("[%s] RPC調用總共耗時: [%s] 毫秒", protocol, sw.getTime());
        System.out.println(tip);

    }

    //JDK本地序列化協議
    public static void JdkNativeParallelTask(MessageSendExecutor executor, int parallel) throws InterruptedException {
        String serverAddress = "127.0.0.1:18887";
        RpcSerializeProtocol protocol = RpcSerializeProtocol.JDKSERIALIZE;
        executor.setRpcServerLoader(serverAddress, protocol);
        RpcParallelTest.parallelTask(executor, parallel, serverAddress, protocol);
        TimeUnit.SECONDS.sleep(3);
    }

    //Kryo序列化協議
    public static void KryoParallelTask(MessageSendExecutor executor, int parallel) throws InterruptedException {
        String serverAddress = "127.0.0.1:18888";
        RpcSerializeProtocol protocol = RpcSerializeProtocol.KRYOSERIALIZE;
        executor.setRpcServerLoader(serverAddress, protocol);
        RpcParallelTest.parallelTask(executor, parallel, serverAddress, protocol);
        TimeUnit.SECONDS.sleep(3);
    }

    //Hessian序列化協議
    public static void HessianParallelTask(MessageSendExecutor executor, int parallel) throws InterruptedException {
        String serverAddress = "127.0.0.1:18889";
        RpcSerializeProtocol protocol = RpcSerializeProtocol.HESSIANSERIALIZE;
        executor.setRpcServerLoader(serverAddress, protocol);
        RpcParallelTest.parallelTask(executor, parallel, serverAddress, protocol);
        TimeUnit.SECONDS.sleep(3);
    }

    public static void main(String[] args) throws Exception {
        //並行度10000
        int parallel = 10000;
        MessageSendExecutor executor = new MessageSendExecutor();

        for (int i = 0; i < 10; i++) {
            JdkNativeParallelTask(executor, parallel);
            KryoParallelTask(executor, parallel);
            HessianParallelTask(executor, parallel);
            System.out.printf("[author tangjie] Netty RPC Server 消息協議序列化第[%d]輪並發驗證結束!\n\n", i);
        }

        executor.stop();
    }
}
復制代碼

  運行截圖如下:

  現在,我就收集匯總一下測試數據,分析對比一下,每一種協議對RPC消息序列化/反序列化的性能(注意:由於每台計算機的配置差異,下面的測試結論可能存在出入,本次測試結果僅僅是學習交流之用!)。

  經過10輪的壓力測試,具體的數據如下所示:

 

  可以很明顯的發現,經過上述代碼框架優化調整之后,Java原生本地序列化的處理性能,跟之前博客文章中設計實現處理性能上對比,運行效率有較大的提升(RPC消息序列化/反序列耗時更少)。Java本地序列化、Kryo序列化、Hessian序列化在10次的壓力測試中,分別有1次耗時大於10S(秒)的操作。經過統計分析之后,結果如下圖:

     

  Kryo序列化、Hessian序列化的性能不分伯仲,並且總體優於Java本地序列化的性能水平。

  再來看下,10輪壓力測試中,Java本地序列化、Kryo序列化、Hessian序列化的耗時波動情況,如下圖所示:

    

  可以很清楚的發現,三種序列化方式分別有個“拐點”,除開這個“拐點”,三種序列化方式耗時相對來說比較平穩。但是總體而言,Kryo、Hessian序列化耗時有適當的波動,震盪相對比較明顯;而Java原生序列化耗時相對來說比較平穩,沒有出現頻繁的震盪,但是耗時較長。

  寫在最后:本文是前一篇文章“談談如何使用Netty開發實現高性能的RPC服務器”的性能優化篇,主要從RPC消息序列化機制、對象池(Object Pooling)、多線程優化等角度,對之前設計實現的基於Netty的RPC服務器框架進行優化重構。當然目前的RPC服務器,還僅僅處於“各自為政”的狀態,能不能把集群中的若干台RPC服務器,通過某種機制進行統一的分布式協調管理、以及服務調度呢?答案是肯定的,一種可行的方案就是引入Zookeeper,進行服務治理。后續有時間,我會繼續加以優化改良,到時再以博客的形式,呈現給大家!由於本人的認知水平、技術能力的限制,本文中涉及的技術觀點、測試數據、測試結論等等,僅限於博客園中園友們的學習交流之用。如果本人有說得不對的地方,歡迎各位園友批評指正!

  洋洋灑灑地寫了這么多,感謝您的耐心閱讀。相信讀完本篇文章,面前的您,對於利用Java開發高性能的服務端應用,又多了一份了解和自信。路漫漫其修遠兮,吾將上下而求索。對於軟件知識的求學探索之路沒有止境,謹以此話和大家共勉之!

 
 
標簽:  Java


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM