netty實現遠程調用RPC功能


netty實現遠程調用RPC功能

PRC的功能一句話說白了,就是遠程調用其他電腦的api

依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.31.Final</version>
</dependency>

服務端功能模塊編寫

  1. 項目結構

  2. ClassInfo

    package test;
    
    import java.io.Serializable;
    
    public class ClassInfo implements Serializable {
    
        private static final long serialVersionUID = -8970942815543515064L;
        
        private String className;//類名
        private String methodName;//函數名稱
        private Class<?>[] types;//參數類型  
        private Object[] objects;//參數列表  
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getMethodName() {
            return methodName;
        }
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
        public Class<?>[] getTypes() {
            return types;
        }
        public void setTypes(Class<?>[] types) {
            this.types = types;
        }
        public Object[] getObjects() {
            return objects;
        }
        public void setObjects(Object[] objects) {
            this.objects = objects;
        }
    }
    
  3. InvokerHandler

    package test;
    
    import java.lang.reflect.Method;
    import java.util.concurrent.ConcurrentHashMap;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class InvokerHandler extends ChannelInboundHandlerAdapter {
        public static ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<String,Object>();
        @Override  
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
            ClassInfo classInfo = (ClassInfo)msg;
            Object claszz = null;
            if(!classMap.containsKey(classInfo.getClassName())){
                try {
                    claszz = Class.forName(classInfo.getClassName()).getDeclaredConstructor().newInstance();
                    classMap.put(classInfo.getClassName(), claszz);
                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
                    e.printStackTrace();
                }
            }else {
                claszz = classMap.get(classInfo.getClassName());
            }
            Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());  
            Object result = method.invoke(claszz, classInfo.getObjects()); 
            System.out.println("服務器接受客戶端的數據" + result);
            ctx.write("SeverSendData");
            ctx.flush();  
            ctx.close();
        }  
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
            cause.printStackTrace();  
            ctx.close();  
        }  
    
    }
    
  4. RPCServer

    package test;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    public class RPCServer {
        private int port;
    
        public RPCServer(int port) {
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = 
                        new ServerBootstrap()
                        .group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class).localAddress(port)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                pipeline.addLast(new LengthFieldPrepender(4));
                                pipeline.addLast("encoder", new ObjectEncoder());
                                pipeline.addLast("decoder",
                                        new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast(new InvokerHandler());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                ChannelFuture future = serverBootstrap.bind(port).sync();
                System.out.println("Server start listen at " + port);
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port;
            if (args.length > 0) {
                port = Integer.parseInt(args[0]);
            } else {
                port = 8080;
            }
            new RPCServer(port).start();
        }
    }
    
  5. SeverClass

    package test;
    
    public class SeverClass implements SeverInterface {
    
        @Override
        public String fn(String data) {
            return data;
        }
    
    }
    
  6. SeverInterface

    package test;
    
    public interface SeverInterface {
        String fn(String data);
    }
    

客戶端功能模塊編寫

  1. 目錄結構

  2. App

    package test;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    public class App 
    {
        public static void main(String [] args) throws Throwable{
            
            ClassInfo classInfo = new ClassInfo();
            classInfo.setClassName("test.SeverClass");
            classInfo.setMethodName("fn");
            classInfo.setObjects(new Object[]{"ClientSendData"});
            classInfo.setTypes(new Class[]{String.class});
            
            ResultHandler resultHandler = new ResultHandler();
            EventLoopGroup group = new NioEventLoopGroup();  
            try {  
                Bootstrap b = new Bootstrap();  
                b.group(group)  
                    .channel(NioSocketChannel.class)  
                    .option(ChannelOption.TCP_NODELAY, true)  
                    .handler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        public void initChannel(SocketChannel ch) throws Exception {  
                            ChannelPipeline pipeline = ch.pipeline();  
                            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));  
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));  
                            pipeline.addLast("encoder", new ObjectEncoder());    
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));  
                            pipeline.addLast("handler",resultHandler);
                        }  
                    });  
        
                ChannelFuture future = b.connect("localhost", 8080).sync();  
                future.channel().writeAndFlush(classInfo).sync();
                future.channel().closeFuture().sync();  
            } finally {  
                group.shutdownGracefully();  
            }
    
            System.out.println("調用服務器數據返回回來的數據:" + resultHandler.getResponse());
        }
    }
    
  3. ResultHandler

    package test;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ResultHandler extends ChannelInboundHandlerAdapter {
    
        private Object response;
    
        public Object getResponse() {
            return response;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response = msg;
            System.out.println("調用服務器數據返回回來的數據:" + response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("client exception is general");
        }
    }
    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM