一、學習本文你能學到什么?
- RPC的概念及運作流程
- RPC協議及RPC框架的概念
- Netty的基本使用
- Java序列化及反序列化技術
- Zookeeper的基本使用(注冊中心)
- 自定義注解實現特殊業務邏輯
- Java的動態代理
- 自定義Spring Boot Starter
這里只是列出了你能從RPC框架源碼中能學到的東西,本文並不會每個知識點都點到,主要講述如何手寫一個RPC框架,更多細節需要讀者閱讀源碼,文章的下方會提供源碼鏈接哦。
二、RPC基礎知識
2.1 RPC是什么?
Remote Procedure Call(RPC):遠程過程調用。
過程是什么?
過程就是業務處理、計算任務,更直白理解,就是程序。(像調用本地方法一樣調用遠程的過程。)
RPC采用Client-Server結構,通過Request-Response消息模式實現。
2.2 RPC的流程
- 客戶端處理過程中
調用
Client stub(就像調用本地方法一樣),傳遞參數; - Client stub將參數
編組
為消息,然后通過系統調用向服務端發送消息; - 客戶端本地操作系統將消息從客戶端機器
發送
到服務端機器; - 服務端操作系統將接收到的數據包
傳遞
給Server stub; - Server stub
解組
消息為參數; - Server stub
再調用
服務端的過程,過程執行結果以反方向的相同步驟響應給客戶端。
2.3 RPC流程中需要處理的問題
- Client stub、Server stub的開發;
- 參數如何編組為消息,以及解組消息;
- 消息如何發送;
- 過程結果如何表示、異常情況如何處理;
- 如何實現安全的訪問控制。
2.4 RPC協議是什么?
RPC調用過程中需要將參數編組為消息進行發送,接受方需要解組消息為參數,過程處理結果同樣需要經編組、解組。消息由哪些部分構成及消息的表示形式就構成了消息協議。
RPC調用過程中采用的消息協議稱為RPC協議
RPC協議規定請求、響應消息的格式
在TCP(網絡傳輸控制協議)上可選用或自定義消息協議來完成RPC消息交互
我們可以選用通用的標准協議(如:http、https),也也可根據自身的需要定義自己的消息協議。
2.5 RPC框架是什么?
封裝好參數編組、消息解組、底層網絡通信的RPC程序開發框架,帶來的便捷是可以直接在其基礎上只需要專注於過程代碼編寫。
Java領域:
- 傳統的webservice框架:Apache CXF、Apache Axis2、Java自帶的JAX-WS等。webservice框架大多基於標准的SOAP協議。
- 新興的微服務框架:Dubbo、spring cloud、Apache Thrift等。
三、手寫RPC
3.1 目標
我們將會寫一個簡易的RPC框架,暫且叫它leisure-rpc-spring-boot-starter
,通過在項目中引入該starter,並簡單的配置一下,項目即擁有提供遠程服務的能力。
編寫自定義注解@Service
,被它注解的類將會提供遠程服務。
編寫自定義注解@InjectService
,使用它可注入遠程服務。
3.2 項目整體結構
3.3 客戶端編寫
3.3.1 客戶端需要做什么?
客戶端想要調用遠程服務,必須具備服務發現的能力;在知道有哪些服務過后,還必須有服務代理來執行服務調用;客戶端想要與服務端通信,必須要有相同的消息協議;客戶端想要調用遠程服務,那么必須具備網絡請求的能力,即網絡層功能。
當然,這是客戶端所需的最基本的能力,其實還可以擴展的能力,例如負載均衡。
3.3.2 具體實現
我們先看看客戶端的代碼結構:
基於面向接口編程的理念,不同角色都實現了定義了相應規范的接口。這里面我們沒有發現消息協議相關內容,那是因為服務端也需要消息協議,因此抽離了出來,放在公共層。
3.3.2.1 服務發現者
/**
* 服務發現抽象類,定義服務發現規范
*
* @author 東方雨傾
* @since 1.0.0
*/
public interface ServiceDiscoverer {
List<Service> getServices(String name);
}
/**
* Zookeeper服務發現者,定義以Zookeeper為注冊中心的服務發現細則
*
* @author 東方雨傾
* @since 1.0.0
*/
public class ZookeeperServiceDiscoverer implements ServiceDiscoverer {
private ZkClient zkClient;
public ZookeeperServiceDiscoverer(String zkAddress) {
zkClient = new ZkClient(zkAddress);
zkClient.setZkSerializer(new ZookeeperSerializer());
}
/**
* 使用Zookeeper客戶端,通過服務名獲取服務列表
* 服務名格式:接口全路徑
*
* @param name 服務名
* @return 服務列表
*/
@Override
public List<Service> getServices(String name) {
String servicePath = LeisureConstant.ZK_SERVICE_PATH + LeisureConstant.PATH_DELIMITER + name + "/service";
List<String> children = zkClient.getChildren(servicePath);
return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> {
String deCh = null;
try {
deCh = URLDecoder.decode(str, LeisureConstant.UTF_8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return JSON.parseObject(deCh, Service.class);
}).collect(Collectors.toList());
}
}
服務發現者使用Zookeeper來實現,通過ZkClient我們很容易發現已經注冊在ZK上的服務。當然我們也可以使用其他組件作為注冊中心,例如Redis。
3.3.2.2 網絡客戶端
/**
* 網絡請求客戶端,定義網絡請求規范
*
* @author 東方雨傾
* @since 1.0.0
*/
public interface NetClient {
byte[] sendRequest(byte[] data, Service service) throws InterruptedException;
}
/**
* Netty網絡請求客戶端,定義通過Netty實現網絡請求的細則。
*
* @author 東方雨傾
* @since 1.0.0
*/
public class NettyNetClient implements NetClient {
private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
/**
* 發送請求
*
* @param data 請求數據
* @param service 服務信息
* @return 響應數據
* @throws InterruptedException 異常
*/
@Override
public byte[] sendRequest(byte[] data, Service service) throws InterruptedException {
String[] addInfoArray = service.getAddress().split(":");
String serverAddress = addInfoArray[0];
String serverPort = addInfoArray[1];
SendHandler sendHandler = new SendHandler(data);
byte[] respData;
// 配置客戶端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(sendHandler);
}
});
// 啟動客戶端連接
b.connect(serverAddress, Integer.parseInt(serverPort)).sync();
respData = (byte[]) sendHandler.rspData();
logger.info("SendRequest get reply: {}", respData);
} finally {
// 釋放線程組資源
group.shutdownGracefully();
}
return respData;
}
}
/**
* 發送處理類,定義Netty入站處理細則
*
* @author 東方雨傾
* @since 1.0.0
*/
public class SendHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(SendHandler.class);
private CountDownLatch cdl;
private Object readMsg = null;
private byte[] data;
public SendHandler(byte[] data) {
cdl = new CountDownLatch(1);
this.data = data;
}
/**
* 當連接服務端成功后,發送請求數據
*
* @param ctx 通道上下文
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.info("Successful connection to server:{}", ctx);
ByteBuf reqBuf = Unpooled.buffer(data.length);
reqBuf.writeBytes(data);
logger.info("Client sends message:{}", reqBuf);
ctx.writeAndFlush(reqBuf);
}
/**
* 讀取數據,數據讀取完畢釋放CD鎖
*
* @param ctx 上下文
* @param msg ByteBuf
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.info("Client reads message: {}", msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] resp = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(resp);
readMsg = resp;
cdl.countDown();
}
/**
* 等待讀取數據完成
*
* @return 響應數據
* @throws InterruptedException 異常
*/
public Object rspData() throws InterruptedException {
cdl.await();
return readMsg;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
logger.error("Exception occurred:{}", cause.getMessage());
ctx.close();
}
}
在這里我們使用Netty來實現網絡請求客戶端,當然也可以使用Mina。網絡請求客戶端能連接遠程服務端,並將編組好的請求數據發送給服務端,待服務端處理好后,又將服務端的響應數據返回給客戶端。
3.3.2.3 服務代理
/**
* 客戶端代理工廠:用於創建遠程服務代理類
* 封裝編組請求、請求發送、編組響應等操作。
*
* @author 東方雨傾
* @since 1.0.0
*/
public class ClientProxyFactory {
private ServiceDiscoverer serviceDiscoverer;
private Map<String, MessageProtocol> supportMessageProtocols;
private NetClient netClient;
private Map<Class<?>, Object> objectCache = new HashMap<>();
/**
* 通過Java動態代理獲取服務代理類
*
* @param clazz 被代理類Class
* @param <T> 泛型
* @return 服務代理類
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) this.objectCache.computeIfAbsent(clazz,
cls -> newProxyInstance(cls.getClassLoader(), new Class<?>[]{cls}, new ClientInvocationHandler(cls)));
}
// getter setter ...
/**
* 客戶端服務代理類invoke函數細節實現
*/
private class ClientInvocationHandler implements InvocationHandler {
private Class<?> clazz;
private Random random = new Random();
public ClientInvocationHandler(Class<?> clazz) {
super();
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Exception {
if (method.getName().equals("toString")) {
return proxy.getClass().toString();
}
if (method.getName().equals("hashCode")) {
return 0;
}
// 1、獲得服務信息
String serviceName = this.clazz.getName();
List<Service> services = serviceDiscoverer.getServices(serviceName);
if (services == null || services.isEmpty()) {
throw new LeisureException("No provider available!");
}
// 隨機選擇一個服務提供者(軟負載均衡)
Service service = services.get(random.nextInt(services.size()));
// 2、構造request對象
LeisureRequest req = new LeisureRequest();
req.setServiceName(service.getName());
req.setMethod(method.getName());
req.setParameterTypes(method.getParameterTypes());
req.setParameters(args);
// 3、協議層編組
// 獲得該方法對應的協議
MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol());
// 編組請求
byte[] data = protocol.marshallingRequest(req);
// 4、調用網絡層發送請求
byte[] repData = netClient.sendRequest(data, service);
// 5解組響應消息
LeisureResponse rsp = protocol.unmarshallingResponse(repData);
// 6、結果處理
if (rsp.getException() != null) {
throw rsp.getException();
}
return rsp.getReturnValue();
}
}
}
服務代理類由客戶端代理工廠類產生,代理方式是基於Java的動態代理。在處理類ClientInvocationHandler的invoke函數中,定義了一系列的操作,包括獲取服務、選擇服務提供者、構造請求對象、編組請求對象、網絡請求客戶端發送請求、解組響應消息、異常處理等。
3.3.2.4 消息協議
/**
* 消息協議,定義編組請求、解組請求、編組響應、解組響應規范
*
* @author 東方雨傾
* @since 1.0.0
*/
public interface MessageProtocol {
/**
* 編組請求
*
* @param req 請求信息
* @return 請求字節數組
* @throws Exception 編組請求異常
*/
byte[] marshallingRequest(LeisureRequest req) throws Exception;
/**
* 解組請求
*
* @param data 請求字節數組
* @return 請求信息
* @throws Exception 解組請求異常
*/
LeisureRequest unmarshallingRequest(byte[] data) throws Exception;
/**
* 編組響應
*
* @param rsp 響應信息
* @return 響應字節數組
* @throws Exception 編組響應異常
*/
byte[] marshallingResponse(LeisureResponse rsp) throws Exception;
/**
* 解組響應
*
* @param data 響應字節數組
* @return 響應信息
* @throws Exception 解組響應異常
*/
LeisureResponse unmarshallingResponse(byte[] data) throws Exception;
}
/**
* Java序列化消息協議
*
* @author 東方雨傾
* @since 1.0.0
*/
public class JavaSerializeMessageProtocol implements MessageProtocol {
private byte[] serialize(Object obj) throws Exception {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bout);
out.writeObject(obj);
return bout.toByteArray();
}
@Override
public byte[] marshallingRequest(LeisureRequest req) throws Exception {
return this.serialize(req);
}
@Override
public LeisureRequest unmarshallingRequest(byte[] data) throws Exception {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
return (LeisureRequest) in.readObject();
}
@Override
public byte[] marshallingResponse(LeisureResponse rsp) throws Exception {
return this.serialize(rsp);
}
@Override
public LeisureResponse unmarshallingResponse(byte[] data) throws Exception {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
return (LeisureResponse) in.readObject();
}
}
消息協議主要是定義了客戶端如何編組請求、解組響應,服務端如何解組請求、編組響應這四個操作規范。本文提供了Java序列化與反序列化的實現,感興趣的讀者可以基於其他序列化技術實現其他消息協議(偷偷說一句:Java的序列化性能很不理想)。
3.4 服務端編寫
3.4.1 服務端需要做什么?
首先,服務端要提供遠程服務,必須具備服務注冊及暴露的能力;在這之后,還需要開啟網絡服務,供客戶端連接。有些項目可能既是服務提供者,又是服務消費者,那什么時候開啟服務,什么時候注入服務呢?這里我們引入一個RPC處理者的概念,由它來幫我們開啟服務,以及注入服務。
3.4.3 具體實現
先看看服務端的代碼結構:
服務端做的事情也很簡單,注冊服務並暴露服務,然后開啟網絡服務;如果服務端也是消費者,則注入遠程服務。
服務注冊和服務注入依賴兩個自定義注解來實現:
- @Service:注冊服務
- @InjectService:注入服務
下面是他們的實現代碼:
/**
* 被該注解標記的服務可提供遠程RPC訪問的能力
*
* @author 東方雨傾
* @since 1.0.0
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Service {
String value() default "";
}
/**
* 該注解用於注入遠程服務
*
* @author 東方雨傾
* @since 1.0.0
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface InjectService {
}
3.4.3.1 服務注冊(暴露)
/**
* 服務注冊器,定義服務注冊規范
*
* @author 東方雨傾
* @since 1.0.0
*/
public interface ServiceRegister {
void register(ServiceObject so) throws Exception;
ServiceObject getServiceObject(String name) throws Exception;
}
/**
* 默認服務注冊器
*
* @author 東方雨傾
* @since 1.0.0
*/
public class DefaultServiceRegister implements ServiceRegister {
private Map<String, ServiceObject> serviceMap = new HashMap<>();
protected String protocol;
protected Integer port;
@Override
public void register(ServiceObject so) throws Exception {
if (so == null) {
throw new IllegalArgumentException("Parameter cannot be empty.");
}
this.serviceMap.put(so.getName(), so);
}
@Override
public ServiceObject getServiceObject(String name) {
return this.serviceMap.get(name);
}
}
/**
* Zookeeper服務注冊器,提供服務注冊、服務暴露的能力
*
* @author 東方雨傾
* @since 1.0.0
*/
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {
/**
* Zk客戶端
*/
private ZkClient client;
public ZookeeperExportServiceRegister(String zkAddress, Integer port, String protocol) {
client = new ZkClient(zkAddress);
client.setZkSerializer(new ZookeeperSerializer());
this.port = port;
this.protocol = protocol;
}
/**
* 服務注冊
*
* @param so 服務持有者
* @throws Exception 注冊異常
*/
@Override
public void register(ServiceObject so) throws Exception {
super.register(so);
Service service = new Service();
String host = InetAddress.getLocalHost().getHostAddress();
String address = host + ":" + port;
service.setAddress(address);
service.setName(so.getClazz().getName());
service.setProtocol(protocol);
this.exportService(service);
}
/**
* 服務暴露
*
* @param serviceResource 需要暴露的服務信息
*/
private void exportService(Service serviceResource) {
String serviceName = serviceResource.getName();
String uri = JSON.toJSONString(serviceResource);
try {
uri = URLEncoder.encode(uri, UTF_8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = ZK_SERVICE_PATH + PATH_DELIMITER + serviceName + "/service";
if (!client.exists(servicePath)) {
client.createPersistent(servicePath, true);
}
String uriPath = servicePath + PATH_DELIMITER + uri;
if (client.exists(uriPath)) {
client.delete(uriPath);
}
client.createEphemeral(uriPath);
}
}
這個過程其實沒啥好說的,就是將指定ServiceObject對象序列化后保存到ZK上,供客戶端發現。同時會將服務對象緩存起來,在客戶端調用服務時,通過緩存的ServiceObject對象反射指定服務,調用方法。
3.4.3.2 網絡服務
/**
* RPC服務端抽象類
*
* @author 東方雨傾
* @since 1.0.0
*/
public abstract class RpcServer {
/**
* 服務端口
*/
protected int port;
/**
* 服務協議
*/
protected String protocol;
/**
* 請求處理者
*/
protected RequestHandler handler;
public RpcServer(int port, String protocol, RequestHandler handler) {
super();
this.port = port;
this.protocol = protocol;
this.handler = handler;
}
/**
* 開啟服務
*/
public abstract void start();
/**
* 停止服務
*/
public abstract void stop();
// getter setter ...
}
/**
* Netty RPC服務端,提供Netty網絡服務開啟、關閉的能力
*
* @author 東方雨傾
* @since 1.0.0
*/
public class NettyRpcServer extends RpcServer {
private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
private Channel channel;
public NettyRpcServer(int port, String protocol, RequestHandler handler) {
super(port, protocol, handler);
}
@Override
public void start() {
// 配置服務器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new ChannelRequestHandler());
}
});
// 啟動服務
ChannelFuture f = b.bind(port).sync();
logger.info("Server started successfully.");
channel = f.channel();
// 等待服務通道關閉
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放線程組資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void stop() {
this.channel.close();
}
private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.info("Channel active:{}", ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("The server receives a message: {}", msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] req = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(req);
byte[] res = handler.handleRequest(req);
logger.info("Send response:{}", msg);
ByteBuf respBuf = Unpooled.buffer(res.length);
respBuf.writeBytes(res);
ctx.write(respBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
logger.error("Exception occurred:{}", cause.getMessage());
ctx.close();
}
}
}
/**
* 請求處理者,提供解組請求、編組響應等操作
*
* @author 東方雨傾
* @since 1.0.0
*/
public class RequestHandler {
private MessageProtocol protocol;
private ServiceRegister serviceRegister;
public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
super();
this.protocol = protocol;
this.serviceRegister = serviceRegister;
}
public byte[] handleRequest(byte[] data) throws Exception {
// 1、解組消息
LeisureRequest req = this.protocol.unmarshallingRequest(data);
// 2、查找服務對象
ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());
LeisureResponse rsp = null;
if (so == null) {
rsp = new LeisureResponse(LeisureStatus.NOT_FOUND);
} else {
// 3、反射調用對應的過程方法
try {
Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes());
Object returnValue = m.invoke(so.getObj(), req.getParameters());
rsp = new LeisureResponse(LeisureStatus.SUCCESS);
rsp.setReturnValue(returnValue);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
rsp = new LeisureResponse(LeisureStatus.ERROR);
rsp.setException(e);
}
}
// 4、編組響應消息
return this.protocol.marshallingResponse(rsp);
}
// getter setter ...
}
網絡服務定義了啟動服務的細則,以及如何處理客戶端發來的請求。
3.4.3.3 RPC處理者
/**
* RPC處理者,支持服務啟動暴露、自動注入Service
*
* @author 東方雨傾
* @since 1.0.0
*/
public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private ClientProxyFactory clientProxyFactory;
@Resource
private ServiceRegister serviceRegister;
@Resource
private RpcServer rpcServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (Objects.isNull(event.getApplicationContext().getParent())) {
ApplicationContext context = event.getApplicationContext();
// 開啟服務
startServer(context);
// 注入Service
injectService(context);
}
}
private void startServer(ApplicationContext context) {
Map<String, Object> beans = context.getBeansWithAnnotation(Service.class);
if (beans.size() != 0) {
boolean startServerFlag = true;
for (Object obj : beans.values()) {
try {
Class<?> clazz = obj.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
ServiceObject so;
if (interfaces.length != 1) {
Service service = clazz.getAnnotation(Service.class);
String value = service.value();
if (value.equals("")) {
startServerFlag = false;
throw new UnsupportedOperationException("The exposed interface is not specific with '" + obj.getClass().getName() + "'");
}
so = new ServiceObject(value, Class.forName(value), obj);
} else {
Class<?> superClass = interfaces[0];
so = new ServiceObject(superClass.getName(), superClass, obj);
}
serviceRegister.register(so);
} catch (Exception e) {
e.printStackTrace();
}
}
if (startServerFlag) {
rpcServer.start();
}
}
}
private void injectService(ApplicationContext context) {
String[] names = context.getBeanDefinitionNames();
for (String name : names) {
Class<?> clazz = context.getType(name);
if (Objects.isNull(clazz)) continue;
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
InjectService injectLeisure = field.getAnnotation(InjectService.class);
if (Objects.isNull(injectLeisure)) continue;
Class<?> fieldClass = field.getType();
Object object = context.getBean(name);
field.setAccessible(true);
try {
field.set(object, clientProxyFactory.getProxy(fieldClass));
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
}
DefaultRpcProcessor實現了ApplicationListener,並監聽了ContextRefreshedEvent事件,其效果就是在Spring啟動完畢過后會收到一個事件通知,基於這個機制,就可以在這里開啟服務,以及注入服務。因為一切已經准備就緒了,所需要的資源都是OK的。
四、使用RPC框架
框架一個很重要的特性就是要使用簡單,使用該框架只需要一個條件和四個步驟即可。
4.1 一個條件
需要准備一個Zookeeper作為注冊中心,單節點即可。
4.2 步驟一
引入Maven依賴:
<dependency>
<groupId>wang.leisure</groupId>
<artifactId>leisure-rpc-spring-boot-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
不知道如何獲得依賴的讀者,請在源碼下載后,進入項目目錄下(pom.xml文件所在位置),執行 mvn install命令,即可在本地倉庫生成maven依賴。
4.3 步驟二
在你的項目配置文件(application.properties)中配置注冊中心地址,例如:
leisure.rpc.register-address=192.168.199.241:2181
4.4 步驟三
將你的遠程服務使用@Service注解,例如:
import wang.leisure.rpc.annotation.Service;
@Service
public class UserServiceImpl implements UserService {
@Override
public ApiResult<User> getUser(Long id) {
User user = getFromDbOrCache(id);
return ApiResult.success(user);
}
private User getFromDbOrCache(Long id) {
return new User(id, "東方雨傾", 1, "https://leisure.wang");
}
}
4.5 步驟四
使用注解@InjectService注入遠程服務,例如:
@RestController
@RequestMapping("/index/")
public class IndexController {
@InjectService
private UserService userService;
/**
* 獲取用戶信息
* http://localhost:8080/index/getUser?id=1
*
* @param id 用戶id
* @return 用戶信息
*/
@GetMapping("getUser")
public ApiResult<User> getUser(Long id) {
return userService.getUser(id);
}
}
五、源碼下載
框架源碼:leisure-rpc-spring-boot-starter
為方便讀者看到效果,筆者也簡單的編寫了一個示例項目,可以下載下來試試。如果源碼對你有一丁點的幫助,希望點個小星星支持一下哦。
六、總結
希望讀者能夠真正動手去試一試,只有實踐了才能知道里面的運作邏輯。筆者也是花了兩個星期才把代碼跟文章整理好,並不是因為這個東西難,而是因為沒時間,苦逼的程序早上七點起床,晚上10點左右回家,確實沒啥時間搞這些,哈哈哈。如果文章對你有幫助,希望多多支持。
原文地址:https://leisure.wang/procedural-framework/framework/704.html