前言
Rpc( Remote procedure call):是一種請求 - 響應協議。RPC由客戶端啟動,客戶端向已知的遠程服務器發送請求消息,以使用提供的參數執行指定的過程。遠程服務器向客戶端發送響應,應用程序繼續其進程。當服務器正在處理該調用時,客戶端被阻塞(它等待服務器在恢復執行之前完成處理),除非客戶端向服務器發送異步請求,例如XMLHttpRequest。在各種實現中存在許多變化和細微之處,導致各種不同(不兼容)的RPC協議。
技術選型:
- Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 文件。
- Netty:基於NIO的網絡編程框架,封裝了NIO細節,使用更加方便
- SpringBoot:Spring 的組件的集合,內部封裝服務器,實現了自動加載
1.封裝請求的pojo和響應的pojo
public class RpcRequest { public RpcRequest() { } private Long id; /** * rpc name */ private String className; /** * 方法名 */ private String methodName; /** * 參數 */ private HashMap<Class<?>, Object> arguments; //get and set ...
public class RpcResponse { public RpcResponse() { } private Long id; private Integer code; private Object result; private String failMsg; // get and set ...
2.server端對request進行解碼,對response進行編碼。反之client端對request進行編碼,對response進行解碼,因此需要編寫兩個編碼和解碼器,在不同端,對不同pojo進行編碼解碼
編碼類只對屬於某個 genericClass的類進行編碼,SerializationUtil為使用Protobuffer工具封裝的一個工具類
@ChannelHandler.Sharable public class RpcEncode extends MessageToByteEncoder { //client 端為 request, server 端為 response private Class<?> genericClass; public RpcEncode(Class<?> clazz) { this.genericClass = clazz; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if (genericClass.isInstance(o)) { byte[] data = SerializationUtil.serialize(o); byteBuf.writeInt(data.length); byteBuf.writeBytes(data); } } }
同樣的,解碼
public class RpcDecode extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecode(Class<?> clazz) { this.genericClass = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int dataLength = in.readInt(); //一個整數4個字節 if (dataLength < 4) { return; } in.markReaderIndex(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserialize(data, genericClass); out.add(obj); }
3. server端將數據解碼后,開始使用handler處理client的請求,handler里包含一個map,里面value是使用@RpcService后的bean,key是注解的value,通過RpcRequest的className,從map的key進行匹配,找到bean之后,通過反射執行 methodName對應的方法 和arguments的參數
@RpcService用於標識在發布的服務類上,value為client 請求的classname,該注解繼承了@Component注解,將會被spring注冊為bean
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { String value(); String description() default ""; }
通過@RpcService 標識的類,將被注冊為bean實例,這里將在 LrpcHandlerAutoConfiguration類中,將這些標識了該注解的bean實例找出來,傳入handler中執行client的請求方法
@Configurable @Component public class LrpcHandlerAutoConfiguration implements ApplicationContextAware { private ApplicationContext context; @Value("${lrpc.server}") public String port; @Bean public RpcHandler rpcHandler() { Map<String, Object> rpcBeans = context.getBeansWithAnnotation(RpcService.class); Set<String> beanNameSet = rpcBeans.keySet(); for (String beanName : beanNameSet) { Object obj = rpcBeans.get(beanName); //rpcService注解會 把value的值傳遞給component RpcService annotation = obj.getClass().getDeclaredAnnotation(RpcService.class); //默認bean name if (StringUtils.isBlank(annotation.value()) || annotation.value().equals(beanName)) { continue; } rpcBeans.put(annotation.value(), rpcBeans.get(beanName)); //去掉重復 rpcBeans.remove(beanName); } return new RpcHandler(rpcBeans); } //..........................
RpcHandler的構造函數,注入了一份rpcBeans的引用,當client的RpcRequest請求時,將從該rpcBeans中獲取對應的bean
@ChannelHandler.Sharable public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> { private static final Logger logger = Logger.getLogger(RpcHandler.class.getName()); private Map<String, Object> rpcBeans; public RpcHandler(Map<String, Object> rpcBeans) { this.rpcBeans = rpcBeans; } @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { RpcResponse rpcResponse = handle(msg); ctx.channel().writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE); } private RpcResponse handle(RpcRequest msg) throws InvocationTargetException { RpcResponse rpcResponse = new RpcResponse(); Object obj = rpcBeans.get(msg.getClassName()); //TODO 暫時這樣吧 if (Objects.isNull(obj)) { System.out.println("未找到service"); rpcResponse.setResult(null); rpcResponse.setCode(404); logger.warning("請求的service未找到,msg:" + msg.toString()); return rpcResponse; } rpcResponse.setId(msg.getId()); //解析請求,執行相應的rpc方法 Class<?> clazz = obj.getClass(); String methodName = msg.getMethodName(); HashMap<Class<?>, Object> arguments = msg.getArguments(); FastClass fastClass = FastClass.create(clazz); FastMethod method = fastClass.getMethod(methodName, arguments.keySet().toArray(new Class[arguments.size()])); Object result = method.invoke(obj, arguments.values().toArray()); rpcResponse.setResult(result); rpcResponse.setCode(200); return rpcResponse; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); logger.warning(cause.toString()); } }
4. 啟動 LrpcServer,LrpcChannelInit 在 LrpcHandlerAutoConfiguration中進行初始化,同時注入 lrpc.server 環境變量給port參數
@Component public class LrpcServerImpl implements LrpcServer, ApplicationListener<ApplicationReadyEvent> { @Autowired LrpcChannelInit lrpcChannelInit; @Override public void connect() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .handler(new LoggingHandler(LogLevel.INFO)) .channel(NioServerSocketChannel.class) .childHandler(lrpcChannelInit) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(new InetSocketAddress(lrpcChannelInit.getPort())).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void onApplicationEvent(ApplicationReadyEvent event) { connect(); } }
5. 客戶端handler類 LrpClientHandler,里面持一把對象鎖,因為netty返回數據總是異步的,這里將異步轉成同步,利用 Object的wait()和notify()方法實現,LrpClientHandler這里是多例的,不存在競爭狀態,因此是線程安全的
public class LrpClientHandler extends SimpleChannelInboundHandler<RpcResponse> { private final Object lock = new Object(); private volatile RpcResponse rpcResponse = null; @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception { rpcResponse = msg; synchronized (lock) { lock.notifyAll(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } public Object getLock() { return lock; } public RpcResponse getRpcResponse() { return rpcResponse; } public void setRpcResponse(RpcResponse rpcResponse) { this.rpcResponse = rpcResponse; } }
6. LrpcClientChannelInit 中持有一個LrpcClientHandler的引用,在初始化該類時同時初始化LrpcClientHandler
public class LrpcClientChannelInit extends ChannelInitializer { private LrpClientHandler lrpClientHandler; public LrpcClientChannelInit() { lrpClientHandler = new LrpClientHandler(); } @Override protected void initChannel(Channel ch) { //請求加密 ch.pipeline().addLast(new RpcEncode(RpcRequest.class)) .addLast(new RpcDecode(RpcResponse.class)) .addLast(new LoggingHandler(LogLevel.INFO)) .addLast(lrpClientHandler); } public synchronized void initHandler(LrpClientHandler lrpClientHandler){ this.lrpClientHandler = lrpClientHandler; } public LrpClientHandler getLrpClientHandler() { return lrpClientHandler; } public void setLrpClientHandler(LrpClientHandler lrpClientHandler) { this.lrpClientHandler = lrpClientHandler; } }
7. 持有執行遠程方法的host和port,execute(RpcRequest r)中連接,傳遞參數
public class LrpcExecutorImpl implements LrpcExecutor { private String host; private Integer port; public LrpcExecutorImpl(String host, Integer port) { this.host = host; this.port = port; } @Override public RpcResponse execute(RpcRequest rpcRequest) { LrpcClientChannelInit lrpcClientChannelInit = new LrpcClientChannelInit(); Bootstrap b = new Bootstrap(); EventLoopGroup group = null; ChannelFuture future = null; try { group = new NioEventLoopGroup(); b.group(group) .channel(NioSocketChannel.class) .handler(lrpcClientChannelInit) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .option(ChannelOption.SO_KEEPALIVE, true); future = b.connect(new InetSocketAddress(host, port)).sync(); //TODO 連接好了直接發送消息,同步則阻塞等待通知 future.channel().writeAndFlush(rpcRequest).sync(); Object lock = lrpcClientChannelInit.getLrpClientHandler().getLock(); synchronized (lock) { lock.wait(); } RpcResponse rpcResponse = lrpcClientChannelInit.getLrpClientHandler().getRpcResponse(); if (null != rpcResponse) { future.channel().closeFuture().sync(); } return rpcResponse; } catch (Exception e) { e.printStackTrace(); } finally { lrpcClientChannelInit.getLrpClientHandler().setRpcResponse(null); if (null != group) { group.shutdownGracefully(); } } return null; } //get and set ... }
8.使用實例
Server端發布服務
@RpcService("HelloService") public class HelloServiceImpl implements HelloService { @Override public String say(String msg) { return "Hello Word!" + msg; } }
在application.properties中,注入環境變量:
#
lrpc.server=8888
Client 配置服務地址和LrpcExecutor
lrpc.hello.host=xxxxxxxxxxxxxxxxxxx lrpc.hello.port=8888 lrpc.hello.desc=hello rpc調用
配置調用服務執行器 LrpcExecutor,保存在spring 容器bean里,可通過依賴注入進行調用
@Configuration @Component public class RpcConfiguration { @Bean("rpc.hello") @ConfigurationProperties(prefix = "lrpc.hello") public RpcServerProperties rpcClientCallProperties() { return new RpcServerProperties(); } @Bean("helloRpcExecutor") LrpcExecutor lrpcExecutor(@Qualifier(value = "rpc.hello") RpcServerProperties rpcServerProperties) { return invoke(rpcServerProperties); } private LrpcExecutor invoke(RpcServerProperties config) { return new LrpcExecutorImpl(config.getHost(), config.getPort()); } }
調用服務,methodName為ClassName對應類下的方法名:
@Autowired @Qualifier(value = "helloRpcExecutor") private LrpcExecutor helloRpcExecutor; @GetMapping("/say") public String invoke(String msg) { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName("HelloService"); rpcRequest.setMethodName("say"); rpcRequest.setId(111L); HashMap<Class<?>, Object> arguments = new HashMap<>(8); arguments.put(String.class, "good"); rpcRequest.setArguments(arguments); RpcResponse execute = helloRpcExecutor.execute(rpcRequest); System.out.println(execute.toString()); return execute.toString(); }
最后,以上為個人練手demo,未來有時間會把未完善的地方慢慢完善,最終目標是做成像dobble那樣的pj,如有好的意見或疑惑歡迎各位大佬指點(morty630@foxmail.com),附上 github完整代碼:https://github.com/To-echo/lrpc-all (你的點贊是我的動力)
相關技術文檔
objenesis反射工具:http://objenesis.org/details.html
Protobuf 協議:https://developers.google.com/protocol-buffers/
Protobuffer序列化工具:https://github.com/protostuff/protostuff
RPC介紹:https://en.wikipedia.org/wiki/Remote_procedure_call
Netty官網:https://netty.io/