手寫RPC框架(netty+zookeeper)


  RPC是什么?遠程過程調用,過程就是業務處理、計算任務,像調用本地方法一樣調用遠程的過程。

  RMI和RPC的區別是什么?RMI是遠程方法調用,是oop領域中RPC的一種實現,我們熟悉的restfull和webservice都是RPC,僅僅消息的組織方式和消息協議不同。

  RPC調用過程 :

  1、客戶端處理過程中調用client sub(像調用本地方法一樣),傳遞參數
  2、client sub將參數編組為消息,然后通過系統調用想服務端發送消息
  3、客戶端本地操作系統將消息發送給服務端
  4、服務端操作系統將收到的消息包傳給server sub,
  5、server sub解組消息為參數
  6、server sub 調用本地服務,執行結果以反方向相同步驟返回給客戶端

  RPC協議 消息由哪些部分構成及消息的表示形式就構成了消息協議,RPC調用過程中采用的消息協議稱為RPC協議,可以使用通用的協議(http、https),也可以自定義協議

  RPC框架 封裝好參數編組、消息解組、底層通信的RPC程序開發框架,可以在其基礎上只需專注於過程代碼編寫,例如常用的dubbo和springcloud。

  實現RPC的要點有:消息編組解組、服務注冊發現和底層通信,本次基於JDK序列化編組解組消息、zookeeper服務注冊發現及netty底層通信來實現自己的RPC框架

  客戶端及服務端類圖如下

  消息協議

package com.example.demo.protocol;

/**
 * @author hehang on 2019-09-17
 * @description 請求
 */
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class Request implements Serializable {

    private static final long serialVersionUID = -5200571424236772650L;

    private String serviceName;

    private String method;

    private Map<String, String> headers = new HashMap<String, String>();

    private Class<?>[] prameterTypes;

    private Object[] parameters;

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Map<String, String> getHeaders() {
        return headers;
    }

    public void setHeaders(Map<String, String> headers) {
        this.headers = headers;
    }

    public Class<?>[] getPrameterTypes() {
        return prameterTypes;
    }

    public void setPrameterTypes(Class<?>[] prameterTypes) {
        this.prameterTypes = prameterTypes;
    }

    public void setParameters(Object[] prameters) {
        this.parameters = prameters;
    }

    public String getHeader(String name) {
        return this.headers == null ? null : this.headers.get(name);
    }

    public Object[] getParameters() {
        return this.parameters;
    }

}
package com.example.demo.protocol;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/**
 * @author hehang on 2019-09-17
 * @description 響應
 */
public class Response implements Serializable {

    private static final long serialVersionUID = -4317845782629589997L;

    private Status status;

    private Map<String, String> headers = new HashMap<String, String>();

    private Object returnValue;

    private Exception exception;

    public Response() {
    }

    ;

    public Response(Status status) {
        this.status = status;
    }

    public void setStatus(Status status) {
        this.status = status;
    }

    public void setHeaders(Map<String, String> headers) {
        this.headers = headers;
    }

    public void setReturnValue(Object returnValue) {
        this.returnValue = returnValue;
    }

    public void setException(Exception exception) {
        this.exception = exception;
    }

    public Status getStatus() {
        return status;
    }

    public Map<String, String> getHeaders() {
        return headers;
    }

    public Object getReturnValue() {
        return returnValue;
    }

    public Exception getException() {
        return exception;
    }

    public String getHeader(String name) {
        return this.headers == null ? null : this.headers.get(name);
    }

    public void setHaader(String name, String value) {
        this.headers.put(name, value);

    }

}
package com.example.demo.protocol;

public enum Status {
    SUCCESS(200, "SUCCESS"), ERROR(500, "ERROR"), NOT_FOUND(404, "NOT FOUND");

    private int code;

    private String message;

    private Status(int code, String message) {
        this.code = code;
        this.message = message;
    }

    public int getCode() {
        return code;
    }

    public String getMessage() {
        return message;
    }
}
package com.example.demo.protocol;

public interface MessageProtocol {

    byte[] marshallingRequest(Request request) throws Exception;
    Request unmarshallingRequest(byte[] bytes) throws Exception;
    byte[] marshallingResponse(Response response) throws Exception;
    Response unmarshallingReposne(byte[] bytes)throws Exception;

}
package com.example.demo.protocol;

import java.io.*;

/**
 * @author hehang on 2019-09-17
 * @description 基於jdk序列化的消息協議,我們也可以基於FastJSON序列化實現消息協議,甚至復雜的http協議等
 * jdk序列化時,被序列化的對象必須實現序列化接口,其內的屬性也必須實現
 */
public class JdkSerializeMessageProtocal implements MessageProtocol {

    public byte[] marshallingRequest(Request request) throws Exception {
        return serialize(request);
    }

    public Request unmarshallingRequest(byte[] bytes) throws Exception {
        return (Request) unserialize(bytes);
    }

    public byte[] marshallingResponse(Response response) throws Exception {
        return serialize(response);
    }

    public Response unmarshallingReposne(byte[] bytes) throws Exception{
        return (Response) unserialize(bytes);
    }

    private byte[] serialize(Object obj) throws Exception{
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(obj);
        return byteArrayOutputStream.toByteArray();
    }


    private Object unserialize(byte[] bytes) throws Exception{
        ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
        return objectInputStream.readObject();
    }
}

  服務發現

package com.example.demo.common;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

/**
 * @author hehang on 2019-09-17
 * @description zk序列化
 */
public class MyZkSerializer implements ZkSerializer {
    String charset = "UTF-8";

    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, charset);
        } catch (UnsupportedEncodingException e) {
            throw new ZkMarshallingError(e);
        }
    }

    public byte[] serialize(Object obj) throws ZkMarshallingError {
        try {
            return String.valueOf(obj).getBytes(charset);
        } catch (UnsupportedEncodingException e) {
            throw new ZkMarshallingError(e);
        }
    }
}
package com.example.demo.discovery;

/**
 * @author hehang on 2019-09-17
 * @description 服務信息
 */
public class ServiceInfo {

    private String name;
    private String protocol;
    private String address;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getProtocol() {
        return protocol;
    }

    public void setProtocol(String protocol) {
        this.protocol = protocol;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}
package com.example.demo.discovery;

import java.util.List;

public interface ServiceInfoDiscoverer {

    List<ServiceInfo> getServerInfos(String name);
}
package com.example.demo.discovery;

import com.alibaba.fastjson.JSON;
import com.example.demo.common.MyZkSerializer;
import com.example.demo.utils.PropertiesUtil;
import org.I0Itec.zkclient.ZkClient;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.List;

/**
 * @author hehang on 2019-09-17
 * @description zk服務發現
 */
public class ZkServiceInfoDiscoverer implements ServiceInfoDiscoverer {
    private ZkClient zkClient;

    private String rootPath = "/rpc";

    public ZkServiceInfoDiscoverer(){
        String zkAddress = PropertiesUtil.getValue("zk.address");
        zkClient = new ZkClient(zkAddress);
        zkClient.setZkSerializer(new MyZkSerializer());
    }

    public List<ServiceInfo>  getServerInfos(String name) {
        String path = rootPath +"/"+ name +"/service";
        List<String> children = zkClient.getChildren(path);
        List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
        for (String child : children) {
            try {
                String decode = URLDecoder.decode(child,"UTF-8");
                ServiceInfo serviceInfo = JSON.parseObject(decode,ServiceInfo.class);
                serviceInfos.add(serviceInfo);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        return serviceInfos;
    }
}

  網絡通信客戶端

package com.example.demo.client.net;

import com.example.demo.discovery.ServiceInfo;

public interface NetClient {

    byte[] sentRequest(byte[] bytes, ServiceInfo serviceInfo) throws Throwable;
}
package com.example.demo.client.net;

import com.example.demo.discovery.ServiceInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

/**
 * @author hehang on 2019-09-17
 * @description netty實現底層通信,也可以利用bio、原生nio等實現
 */
public class NettyNetClient implements NetClient {
    private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);

    public byte[] sentRequest(final byte[] bytes, ServiceInfo serviceInfo) throws Throwable {
        String[] addreddInfoArray = serviceInfo.getAddress().split(":");
        final SendHandler sendHandler = new SendHandler(bytes);
        byte[] respData = null;
        // 配置客戶端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(sendHandler);
                        }
                    });

            // 啟動客戶端連接
            b.connect(addreddInfoArray[0], Integer.valueOf(addreddInfoArray[1])).sync();
            respData = (byte[]) sendHandler.responseData();
            logger.info("收到響應消息: " + respData);

        } finally {
            // 釋放線程組資源
            group.shutdownGracefully();
        }

        return respData;
    }



    private class SendHandler extends ChannelInboundHandlerAdapter {

        private CountDownLatch cdl = null;
        private Object responseMsg = null;
        private byte[] data;

        public SendHandler(byte[] bytes){
            cdl = new CountDownLatch(1);
            data = bytes;
        }

        @Override
        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            logger.info("連接服務端成功");
            ByteBuf reqBuf = Unpooled.buffer(data.length);
            reqBuf.writeBytes(data);
            logger.info("客戶端發送消息:" + reqBuf);
            channelHandlerContext.writeAndFlush(reqBuf);

        }

        public Object responseData(){
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return responseMsg;
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("client sub 讀取到響應信息:" + msg);
            ByteBuf byteBuf = (ByteBuf) msg;
            byte[] resp = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(resp);
            responseMsg = resp;
            cdl.countDown();
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            logger.error("發生異常",cause);
            ctx.close();
        }


    }
}

  JDK動態代理生成代理類

package com.example.demo.client;

import com.example.demo.client.net.NetClient;
import com.example.demo.discovery.ServiceInfo;
import com.example.demo.discovery.ServiceInfoDiscoverer;
import com.example.demo.protocol.MessageProtocol;
import com.example.demo.protocol.Request;
import com.example.demo.protocol.Response;
import org.omg.CORBA.OBJ_ADAPTER;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
 * @author hehang on 2019-09-17
 * @description 代理工廠
 */
public class ClientSubProxyFactory {

    private ServiceInfoDiscoverer serviceInfoDiscoverer;

    private Map<String, MessageProtocol> supportMessageprotocol ;

    private NetClient netClient;

    private Map<Class<?>, Object> objectCahce = new HashMap<Class<?>, Object>();

    public <T> T getProxy(Class<?> interf){
        T object = (T) this.objectCahce.get(interf);
        if(object==null){
            object = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[]{interf}, new ClientStubInvocationHandler(interf));
            this.objectCahce.put(interf,object);
        }
        return  object;
    }

    public ServiceInfoDiscoverer getServiceInfoDiscoverer() {
        return serviceInfoDiscoverer;
    }

    public void setServiceInfoDiscoverer(ServiceInfoDiscoverer serviceInfoDiscoverer) {
        this.serviceInfoDiscoverer = serviceInfoDiscoverer;
    }

    public Map<String, MessageProtocol> getSupportMessageprotocol() {
        return supportMessageprotocol;
    }

    public void setSupportMessageprotocol(Map<String, MessageProtocol> supportMessageprotocol) {
        this.supportMessageprotocol = supportMessageprotocol;
    }

    public NetClient getNetClient() {
        return netClient;
    }

    public void setNetClient(NetClient netClient) {
        this.netClient = netClient;
    }

    public Map<Class<?>, Object> getObjectCahce() {
        return objectCahce;
    }

    public void setObjectCahce(Map<Class<?>, Object> objectCahce) {
        this.objectCahce = objectCahce;
    }

    private class ClientStubInvocationHandler implements InvocationHandler{

        private Class<?> interf;

        private Random random = new Random();

        public ClientStubInvocationHandler(Class<?> interf){
            super();
            this.interf = interf;
        }


        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

            if (method.getName().equals("toString")) {
                return proxy.getClass().toString();
            }

            if (method.getName().equals("hashCode")) {
                return 0;
            }
            //得到服務信息
            String serviceName = interf.getName();
            List<ServiceInfo> serviceInfos = serviceInfoDiscoverer.getServerInfos(serviceName);
            if(serviceInfos==null && serviceInfos.size()==0){
                throw  new Exception("服務不存在");
            }
            //隨機選擇一個服務
            ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));
            //構造請求信息
            Request request = new Request();
            request.setServiceName(serviceName);
            request.setMethod(method.getName());
            request.setPrameterTypes(method.getParameterTypes());
            request.setParameters(args);
            //消息編組
            MessageProtocol messageProtocol = supportMessageprotocol.get(serviceInfo.getProtocol());
            byte[] bytes = messageProtocol.marshallingRequest(request);
            //發送請求
            byte[] rpsBytes = netClient.sentRequest(bytes, serviceInfo);
            //消息解組
            Response response = messageProtocol.unmarshallingReposne(rpsBytes);

            if(response.getException()!=null){
                throw  response.getException();
            }
            return response.getReturnValue();
        }
    }
}

  服務注冊

package com.example.demo.register;

/**
 * @author hehang on 2019-09-17
 * @description 服務object
 */
public class ServiceObject {

    private String name;

    private Class<?> interf;

    private Object obj;

    public ServiceObject(String name, Class<?> interf, Object obj) {
        super();
        this.name = name;
        this.interf = interf;
        this.obj = obj;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Class<?> getInterf() {
        return interf;
    }

    public void setInterf(Class<?> interf) {
        this.interf = interf;
    }

    public Object getObj() {
        return obj;
    }

    public void setObj(Object obj) {
        this.obj = obj;
    }

}
package com.example.demo.register;

import java.net.UnknownHostException;

public interface ServiceRegister {

    void register (ServiceObject serviceObject,String protocol,int port) throws Exception;

    ServiceObject getServiceObject(String name);
}
package com.example.demo.register;

import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author hehang on 2019-09-17
 * @description
 */
public class DefaultServiceRegister implements ServiceRegister {

    private Map<String,ServiceObject> map = new HashMap<String, ServiceObject>();
    public void register(ServiceObject serviceObject, String protocol, int port) throws Exception {
        if(serviceObject==null){
            throw new IllegalArgumentException("參數不能為空");
        }
        map.put(serviceObject.getName(),serviceObject);
    }

    public ServiceObject getServiceObject(String name) {
        return map.get(name);
    }
}
package com.example.demo.register;

import com.alibaba.fastjson.JSON;
import com.example.demo.common.MyZkSerializer;
import com.example.demo.discovery.ServiceInfo;
import com.example.demo.utils.PropertiesUtil;
import org.I0Itec.zkclient.ZkClient;

import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.URLEncoder;
import java.net.UnknownHostException;

/**
 * @author hehang on 2019-09-17
 * @description
 */
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {

    private ZkClient zkClient;

    private String rootPath ="/rpc";

    public ZookeeperExportServiceRegister(){
        String addr = PropertiesUtil.getValue("zk.address");
        zkClient = new ZkClient(addr);
        zkClient.setZkSerializer(new MyZkSerializer());

    }


    @Override
    public void register(ServiceObject serviceObject, String protocol, int port) throws Exception {
        super.register(serviceObject, protocol, port);
        ServiceInfo serviceInfo = new ServiceInfo();
        String hostIp = InetAddress.getLocalHost().getHostAddress();
        String address = hostIp + ":" + port;
        serviceInfo.setAddress(address);
        serviceInfo.setName(serviceObject.getInterf().getName());
        serviceInfo.setProtocol(protocol);
        exportService(serviceInfo);

    }

    private void exportService(ServiceInfo serviceInfo ){
        String serviceName = serviceInfo.getName();
        String uri = JSON.toJSONString(serviceInfo);
        try {
            uri = URLEncoder.encode(uri,"UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        String servicePath = rootPath + "/" + serviceName + "/service";
        if(!zkClient.exists(servicePath)){
            zkClient.createPersistent(servicePath,true);
        }
        String uriPath = servicePath + "/" + uri;
        if(zkClient.exists(uriPath)){
            zkClient.delete(uriPath);
        }
        zkClient.createEphemeral(uriPath);
    }
}

  網絡通信服務器

package com.example.demo.server;

import com.example.demo.protocol.MessageProtocol;
import com.example.demo.protocol.Request;
import com.example.demo.protocol.Response;
import com.example.demo.protocol.Status;
import com.example.demo.register.ServiceObject;
import com.example.demo.register.ServiceRegister;

import java.lang.reflect.Method;

/**
 * @author hehang on 2019-09-17
 * @description 請求處理類
 */
public class RequestHandler {


    private MessageProtocol messageProtocol;

    private ServiceRegister serviceRegister;

    public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
        super();
        this.messageProtocol = protocol;
        this.serviceRegister = serviceRegister;
    }

    public byte[] handlerRequest(byte[] data) throws Exception {
        //解組消息
        Request request = messageProtocol.unmarshallingRequest(data);
        //獲取處理對象
        ServiceObject serviceObject = serviceRegister.getServiceObject(request.getServiceName());
        Response rsp = null;
        if (serviceObject == null) {
            rsp = new Response(Status.NOT_FOUND);
        } else {
            //利用反射調用
            try {
                Method method = serviceObject.getInterf().getMethod(request.getMethod(), request.getPrameterTypes());
                Object obj = method.invoke(serviceObject.getObj(), request.getParameters());
                rsp = new Response(Status.SUCCESS);
                rsp.setReturnValue(obj);
            } catch (Exception e) {
                rsp = new Response(Status.ERROR);
                rsp.setException(e);
            }

        }

        //編組消息
        return messageProtocol.marshallingResponse(rsp);

    }

    public MessageProtocol getMessageProtocol() {
        return messageProtocol;
    }

    public void setMessageProtocol(MessageProtocol messageProtocol) {
        this.messageProtocol = messageProtocol;
    }

    public ServiceRegister getServiceRegister() {
        return serviceRegister;
    }

    public void setServiceRegister(ServiceRegister serviceRegister) {
        this.serviceRegister = serviceRegister;
    }
}
package com.example.demo.server;

public abstract class RpcServer {

    protected int port;

    protected String protocol;

    protected RequestHandler handler;

    public RpcServer(int port, String protocol, RequestHandler handler) {
        super();
        this.port = port;
        this.protocol = protocol;
        this.handler = handler;
    }
    /**
     * 開啟服務
     */
    public abstract void start();

    /**
     * 停止服務
     */
    public abstract void stop();

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getProtocol() {
        return protocol;
    }

    public void setProtocol(String protocol) {
        this.protocol = protocol;
    }

    public RequestHandler getHandler() {
        return handler;
    }

    public void setHandler(RequestHandler handler) {
        this.handler = handler;
    }



}
package com.example.demo.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * @author hehang on 2019-09-17
 * @description
 */
public class NettyRpcServer extends RpcServer {

    private Channel channel;

    public NettyRpcServer(int port, String protocol, RequestHandler handler) {
        super(port, protocol, handler);
    }

    private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);


    @Override
    public void start() {
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
        EventLoopGroup workLoopGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossLoopGroup,workLoopGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline p = socketChannel.pipeline();
                        p.addLast(new ChannelRequestHandler());
                    }
                });
        try {
            //啟動服務
            ChannelFuture future = serverBootstrap.bind(port).sync();
            logger.info("綁定成功");
            channel = future.channel();
            // 等待服務通道關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 釋放線程組資源
            bossLoopGroup.shutdownGracefully();
            workLoopGroup.shutdownGracefully();

        }

    }

    @Override
    public void stop() {
        this.channel.close();
    }



    private class ChannelRequestHandler extends ChannelInboundHandlerAdapter{


        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("激活");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("服務端收到消息:" + msg);
            ByteBuf byteBuf = (ByteBuf) msg;
            byte[] req = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(req);
            byte[] res = handler.handlerRequest(req);
            logger.info("服務端對消息:" + msg +"響應");
            ByteBuf rpsBuf = Unpooled.buffer(res.length);
            rpsBuf.writeBytes(res);
            ctx.write(rpsBuf);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            logger.error("發生異常:" + cause.getMessage());
            ctx.close();
        }


    }
}

  測試代碼

package com.example.demo.main;

/**
 * @author hehang on 2019-09-17
 * @description demo
 */
public interface DemoService {

    String sayHello(String param);
}
package com.example.demo.main;

import com.example.demo.client.ClientSubProxyFactory;
import com.example.demo.client.net.NettyNetClient;
import com.example.demo.discovery.ZkServiceInfoDiscoverer;
import com.example.demo.protocol.JdkSerializeMessageProtocal;
import com.example.demo.protocol.MessageProtocol;

import java.util.HashMap;
import java.util.Map;

/**
 * @author hehang on 2019-09-17
 * @description 客戶端測試類
 */
public class Consume {

    public static void main(String[] args) {
        ClientSubProxyFactory clientSubProxyFactory = new ClientSubProxyFactory();
        clientSubProxyFactory.setServiceInfoDiscoverer(new ZkServiceInfoDiscoverer());
        Map<String, MessageProtocol> messageProtocolMap = new HashMap<String, MessageProtocol>();
        messageProtocolMap.put("jdks",new JdkSerializeMessageProtocal());
        clientSubProxyFactory.setSupportMessageprotocol(messageProtocolMap);
        clientSubProxyFactory.setNetClient(new NettyNetClient());
        DemoService demoService = clientSubProxyFactory.getProxy(DemoService.class);
        String result = demoService.sayHello("hello");
        System.out.println(result);

    }
}
package com.example.demo.main.impl;

import com.example.demo.main.DemoService;

/**
 * @author hehang on 2019-09-17
 * @description
 */
public class DemoServiceImpl implements DemoService {
    public String sayHello(String param) {
        return param + "word";
    }
}
package com.example.demo.main;

import com.example.demo.main.impl.DemoServiceImpl;
import com.example.demo.protocol.JdkSerializeMessageProtocal;
import com.example.demo.register.ServiceObject;
import com.example.demo.register.ServiceRegister;
import com.example.demo.register.ZookeeperExportServiceRegister;
import com.example.demo.server.NettyRpcServer;
import com.example.demo.server.RequestHandler;
import com.example.demo.server.RpcServer;
import com.example.demo.utils.PropertiesUtil;

/**
 * @author hehang on 2019-09-17
 * @description
 */
public class Provider {

    public static void main(String[] args) throws Exception {

        int port = Integer.parseInt(PropertiesUtil.getValue("rpc.port"));
        String protocol = PropertiesUtil.getValue("rpc.protocol");
        ServiceRegister serviceRegister = new ZookeeperExportServiceRegister();
        DemoService demoService = new DemoServiceImpl();
        ServiceObject serviceObject = new ServiceObject(DemoService.class.getName(), DemoService.class, demoService);
        serviceRegister.register(serviceObject, protocol, port);
        RequestHandler requestHandler = new RequestHandler(new JdkSerializeMessageProtocal(), serviceRegister);
        RpcServer rpcServer = new NettyRpcServer(port, protocol, requestHandler);
        rpcServer.start();
        System.in.read();
        rpcServer.stop();


    }
}

  配置文件讀取工具類及配置

package com.example.demo.utils;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author hehang on 2019-09-17
 * @description 讀取配置文件
 */
public class PropertiesUtil {

    private static Properties properties;
    static{

        properties = new Properties();
        try {
            properties.load(PropertiesUtil.class.getClassLoader().getResourceAsStream("app.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static String getValue(String key){
        return (String) properties.get(key);
    }

}
zk.address=127.0.0.1:2181
rpc.port=15002
rpc.protocol=jdks
log4j.rootLogger=info,stdout
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %T %-5p %c{2} (%F:%M(%L)) - %m%n

  相關依賴

<dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.32.Final</version>
        </dependency>
        <!-- SLF4J -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
    </dependencies>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM