基於Netty4手把手實現一個帶注冊中心和注解的Dubbo框架


file

閱讀這篇文章之前,建議先閱讀和這篇文章關聯的內容。

1. 詳細剖析分布式微服務架構下網絡通信的底層實現原理(圖解)

2. (年薪60W的技巧)工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)

3. 深度解析Netty中的核心組件(圖解+實例)

4. BAT面試必問細節:關於Netty中的ByteBuf詳解

5. 通過大量實戰案例分解Netty中是如何解決拆包黏包問題的?

6. 基於Netty實現自定義消息通信協議(協議設計及解析應用實戰)

7. 全網最詳細最齊全的序列化技術及深度解析與應用實戰

8. 手把手教你基於Netty實現一個基礎的RPC框架(通俗易懂)

在本篇文章中,我們繼續圍繞Netty手寫實現RPC基礎篇進行優化,主要引入幾個點

  • 集成spring,實現注解驅動配置
  • 集成zookeeper,實現服務注冊
  • 增加負載均衡實現

源代碼,加「跟着Mic學架構」微信號,回復『rpc』獲取。

增加注解驅動

主要涉及到的修改模塊

  • netty-rpc-protocol
  • netty-rpc-provider

netty-rpc-protocol

當前模塊主要修改的類如下。

image-20210908163139333

圖7-1

下面針對netty-rpc-protocol模塊的修改如下

增加注解驅動

這個注解的作用是用來指定某些服務為遠程服務

@Target(ElementType.TYPE)// Target說明了Annotation所修飾的對象范圍, TYPE:用於描述類、接口(包括注解類型) 或enum聲明
@Retention(RetentionPolicy.RUNTIME)// Reteniton的作用是定義被它所注解的注解保留多久,保留至運行時。所以我們可以通過反射去獲取注解信息。
@Component
public @interface GpRemoteService {

}

SpringRpcProviderBean

這個類主要用來在啟動NettyServer,以及保存bean的映射關系

@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;
    private final String serverAddress; 
    public SpringRpcProviderBean(int serverPort) throws UnknownHostException {
        this.serverPort = serverPort;
        InetAddress address=InetAddress.getLocalHost();
        this.serverAddress=address.getHostAddress();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
        new Thread(()->{
            try {
                new NettyServer(this.serverAddress,this.serverPort).startNettyServer();
            } catch (Exception e) {
                log.error("start Netty Server Occur Exception,",e);
                e.printStackTrace();
            }
        }).start();
    }
	
    //bean實例化后調用
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //針對存在該注解的服務進行發布
            Method[] methods=bean.getClass().getDeclaredMethods();
            for(Method method: methods){ //保存需要發布的bean的映射
                String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName();
                BeanMethod beanMethod=new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key,beanMethod);
            }
        }
        return bean;
    }
}

Mediator

主要管理bean以及調用

BeanMethod

@Data
public class BeanMethod {
    private Object bean;
    private Method method;
}

Mediator

負責持有發布bean的管理,以及bean的反射調用

public class Mediator {
    public static Map<String,BeanMethod> beanMethodMap=new ConcurrentHashMap<>();

    private volatile static Mediator instance=null;

    private Mediator(){
    }

    public static Mediator getInstance(){
        if(instance==null){
            synchronized (Mediator.class){
                if(instance==null){
                    instance=new Mediator();
                }
            }
        }
        return instance;
    }
    public Object processor(RpcRequest rpcRequest){
        String key=rpcRequest.getClassName()+"."+rpcRequest.getMethodName();
        BeanMethod beanMethod=beanMethodMap.get(key);
        if(beanMethod==null){
            return null;
        }
        Object bean=beanMethod.getBean();
        Method method=beanMethod.getMethod();
        try {
            return method.invoke(bean,rpcRequest.getParams());
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }
}

RpcServerProperties

定義配置屬性

@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {

    private int servicePort;
}

RpcProviderAutoConfiguration

定義自動配置類

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
        return new SpringRpcProviderBean(rpcServerProperties.getServicePort());
    }
}

修改RpcServerHandler

修改調用方式,直接使用Mediator的調用即可。

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        RpcProtocol resProtocol=new RpcProtocol<>();
        Header header=msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        Object result=Mediator.getInstance().processor(msg.getContent()); //主要修改這個部分
        resProtocol.setHeader(header);
        RpcResponse response=new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }
}

netty-rpc-provider

這個模塊中主要修改兩個部分

  • application.properties
  • NettyRpcProviderMain

NettyRpcProviderMain

@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"})
@SpringBootApplication
public class NettyRpcProviderMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(NettyRpcProviderMain.class, args);
        //去掉原來的實例化部分
    }
}

application.properties

增加一個配置屬性。

gp.rpc.servicePort=20880

UserServiceImpl

把當前服務發布出去。

@GpRemoteService //表示將當前服務發布成遠程服務
@Slf4j
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {
        log.info("begin saveUser:"+name);
        return "Save User Success!";
    }
}

修改客戶端的注解驅動

客戶端同樣也需要通過注解的方式來引用服務,這樣就能夠徹底的屏蔽掉遠程通信的細節內容,代碼結構如圖7-2所示

image-20210908180518683

圖7-2

增加客戶端注解

在netty-rpc-protocol模塊的annotation目錄下創建下面這個注解。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface GpRemoteReference {
}

SpringRpcReferenceBean

定義工廠Bean,用來構建遠程通信的代理

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;
    private Object object;
    private String serviceAddress;
    private int servicePort;

    @Override
    public Object getObject() throws Exception {
        return object;
    }

    public void init(){
        this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
                new Class<?>[]{this.interfaceClass},
                new RpcInvokerProxy(this.serviceAddress,this.servicePort));
    }

    @Override
    public Class<?> getObjectType() {
        return this.interfaceClass;
    }

    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

    public void setServiceAddress(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public void setServicePort(int servicePort) {
        this.servicePort = servicePort;
    }
}

SpringRpcReferencePostProcessor

用來實現遠程Bean的動態代理注入:

  • BeanClassLoaderAware: 獲取Bean的類裝載器

  • BeanFactoryPostProcessor:在spring容器加載了bean的定義文件之后,在bean實例化之前執行

  • ApplicationContextAware: 獲取上下文對象ApplicationContenxt

@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private ApplicationContext context;
    private ClassLoader classLoader;
    private RpcClientProperties clientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {
        this.clientProperties = clientProperties;
    }

    //保存發布的引用bean信息
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader=classLoader;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context=applicationContext;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
            //遍歷bean定義,然后獲取到加載的bean,遍歷這些bean中的字段,是否攜帶GpRemoteReference注解
            //如果有,則需要構建一個動態代理實現
            BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
            String beanClassName=beanDefinition.getBeanClassName();
            if(beanClassName!=null){
                //和forName方法相同,內部就是直接調用的forName方法
                Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
                //針對當前類中的指定字段,動態創建一個Bean
                ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
            }
        }
        //將@GpRemoteReference注解的bean,構建一個動態代理對象
        BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{
            if(context.containsBean(beanName)){
                log.warn("SpringContext already register bean {}",beanName);
                return;
            }
            //把動態創建的bean注冊到容器中
            registry.registerBeanDefinition(beanName,beanDefinition);
            log.info("registered RpcReferenceBean {} success.",beanName);
        });
    }
    private void parseRpcReference(Field field){
        GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
        if(gpRemoteReference!=null) {
            BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass",field.getType());
            builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort",clientProperties.getServicePort());
            BeanDefinition beanDefinition=builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
        }
    }
}

需要在RpcConstant常量中增加一個INIT_METHOD_NAME屬性

public class RpcConstant {
    //header部分的總字節數
    public final static int HEAD_TOTAL_LEN=16;
    //魔數
    public final static short MAGIC=0xca;

    public static final String INIT_METHOD_NAME = "init";
}

RpcClientProperties

@Data
public class RpcClientProperties {

    private String serviceAddress="192.168.1.102";

    private int servicePort=20880;
}

RpcRefernceAutoConfiguration

@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{

    @Bean
    public SpringRpcReferencePostProcessor postProcessor(){
        String address=environment.getProperty("gp.serviceAddress");
        int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
        RpcClientProperties rc=new RpcClientProperties();
        rc.setServiceAddress(address);
        rc.setServicePort(port);
        return new SpringRpcReferencePostProcessor(rc);
    }

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment=environment;
    }
}

netty-rpc-consumer

修改netty-rpc-consumer模塊

  • 把該模塊變成一個spring boot項目
  • 增加web依賴
  • 添加測試類

image-20210908183814586

圖7-3 netty-rpc-consumer模塊

引入jar包依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

HelloController

@RestController
public class HelloController {

    @GpRemoteReference
    private IUserService userService;

    @GetMapping("/test")
    public String test(){
        return userService.saveUser("Mic");
    }
}

NettyConsumerMain

@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"})
@SpringBootApplication
public class NettyConsumerMain {
    public static void main(String[] args) {
        SpringApplication.run(NettyConsumerMain.class, args);
    }
}

application.properties

gp.serviceAddress=192.168.1.102
servicePort.servicePort=20880

訪問測試

  • 啟動Netty-Rpc-Server
  • 啟動Netty-Rpc-Consumer

如果啟動過程沒有任何問題,則可以訪問HelloController來測試遠程服務的訪問。

引入注冊中心

創建一個netty-rpc-registry模塊,代碼結構如圖7-4所示。

image-20210909174008427

引入相關依賴

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>4.2.0</version>
</dependency>

IRegistryService

public interface IRegistryService {

    /**
     * 注冊服務
     * @param serviceInfo
     * @throws Exception
     */
    void register(ServiceInfo serviceInfo) throws Exception;

    /**
     * 取消注冊
     * @param serviceInfo
     * @throws Exception
     */
    void unRegister(ServiceInfo serviceInfo) throws Exception;

    /**
     * 動態發現服務
     * @param serviceName
     * @return
     * @throws Exception
     */
    ServiceInfo discovery(String serviceName) throws Exception;
}

ServiceInfo

@Data
public class ServiceInfo {
    private String serviceName;
    private String serviceAddress;
    private int servicePort;
}

ZookeeperRegistryService

@Slf4j
public class ZookeeperRegistryService implements IRegistryService {

    private static final String REGISTRY_PATH="/registry";
    //Curator中提供的服務注冊與發現的組件封裝,它對此抽象出了ServiceInstance、
    // ServiceProvider、ServiceDiscovery三個接口,通過它我們可以很輕易的實現Service Discovery
    private final ServiceDiscovery<ServiceInfo> serviceDiscovery;

    private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;

    public ZookeeperRegistryService(String registryAddress) throws Exception {
        CuratorFramework client= CuratorFrameworkFactory
                .newClient(registryAddress,new ExponentialBackoffRetry(1000,3));
        JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class);
        this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class)
                .client(client)
                .serializer(serializer)
                .basePath(REGISTRY_PATH)
                .build();
        this.serviceDiscovery.start();
        loadBalance=new RandomLoadBalance();
    }

    @Override
    public void register(ServiceInfo serviceInfo) throws Exception {
        log.info("開始注冊服務,{}",serviceInfo);
        ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance
                .<ServiceInfo>builder().name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        serviceDiscovery.registerService(serviceInstance);
    }

    @Override
    public void unRegister(ServiceInfo serviceInfo) throws Exception {
        ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder()
                .name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        serviceDiscovery.unregisterService(serviceInstance);
    }

    @Override
    public ServiceInfo discovery(String serviceName) throws Exception {
        Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery
                .queryForInstances(serviceName);
        //通過負載均衡返回某個具體實例
        ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances);
        if(serviceInstance!=null){
            return serviceInstance.getPayload();
        }
        return null;
    }
}

引入負載均衡算法

由於服務端發現服務時可能有多個,所以需要用到負載均衡算法來實現

ILoadBalance

public interface ILoadBalance<T> {

    T select(List<T> servers);
}

AbstractLoadBalance

public abstract class AbstractLoadBanalce implements ILoadBalance<ServiceInstance<ServiceInfo>> {

    @Override
    public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers){
        if(servers==null||servers.size()==0){
            return null;
        }
        if(servers.size()==1){
            return servers.get(0);
        }
        return doSelect(servers);
    }

    protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);
}

RandomLoadBalance

public class RandomLoadBalance extends AbstractLoadBanalce {
    @Override
    protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) {
        int length=servers.size();
        Random random=new Random();
        return servers.get(random.nextInt(length));
    }
}

RegistryType

public enum RegistryType {

    ZOOKEEPER((byte)0),
    EUREKA((byte)1);

    private byte code;

    RegistryType(byte code) {
        this.code=code;
    }

    public byte code(){
        return this.code;
    }

    public static RegistryType findByCode(byte code) {
        for (RegistryType rt : RegistryType.values()) {
            if (rt.code() == code) {
                return rt;
            }
        }
        return null;
    }
}

RegistryFactory

public class RegistryFactory {

    public static IRegistryService createRegistryService(String address,RegistryType registryType){
        IRegistryService registryService=null;
        try {
            switch (registryType) {
                case ZOOKEEPER:
                    registryService = new ZookeeperRegistryService(address);
                    break;
                case EUREKA:
                    //TODO
                    break;
                default:
                    registryService = new ZookeeperRegistryService(address);
                    break;
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return registryService;
    }
}

修改服務端增加服務注冊

修改netty-rpc-protocol模塊,加入注冊中心的支持

SpringRpcProviderBean

按照下面case標注部分,表示要修改的內容

@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;
    private final String serverAddress;
    private final IRegistryService registryService; //修改部分,增加注冊中心實現
    public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException {
        this.serverPort = serverPort;
        InetAddress address=InetAddress.getLocalHost();
        this.serverAddress=address.getHostAddress();
        this.registryService=registryService; //修改部分,增加注冊中心實現
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
        new Thread(()->{
            try {
                new NettyServer(this.serverAddress,this.serverPort).startNettyServer();
            } catch (Exception e) {
                log.error("start Netty Server Occur Exception,",e);
                e.printStackTrace();
            }
        }).start();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //針對存在該注解的服務進行發布
            Method[] methods=bean.getClass().getDeclaredMethods();
            for(Method method: methods){
                String serviceName=bean.getClass().getInterfaces()[0].getName();
                String key=serviceName+"."+method.getName();
                BeanMethod beanMethod=new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key,beanMethod);
                try {
                    //修改部分,增加注冊中心實現
                    ServiceInfo serviceInfo = new ServiceInfo();
                    serviceInfo.setServiceAddress(this.serverAddress);
                    serviceInfo.setServicePort(this.serverPort);
                    serviceInfo.setServiceName(serviceName);
                    registryService.register(serviceInfo);//修改部分,增加注冊中心實現
                }catch (Exception e){
                    log.error("register service {} faild",serviceName,e);
                }
            }
        }
        return bean;
    }
}

RpcServerProperties

修改RpcServerProperties,增加注冊中心的配置

@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {

    private int servicePort;

    private byte registerType;

    private String registryAddress;
}

RpcProviderAutoConfiguration

增加注冊中心的注入。

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
        //添加注冊中心
        IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType()));
        return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService);
    }
}

application.properties

修改netty-rpc-provider中的application.properties。

gp.rpc.servicePort=20880
gp.rpc.registerType=0
gp.rpc.registryAddress=192.168.221.128:2181

修改客戶端,增加服務發現

客戶端需要修改的地方較多,下面這些修改的代碼,都是netty-rpc-protocol模塊中的類。

RpcClientProperties

增加注冊中心類型和注冊中心地址的選項

@Data
public class RpcClientProperties {

    private String serviceAddress="192.168.1.102";

    private int servicePort=20880;

    private byte registryType;

    private String registryAddress;

}

修改NettyClient

原本是靜態地址,現在修改成了從注冊中心獲取地址

@Slf4j
public class NettyClient {
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
   /* private String serviceAddress;
    private int servicePort;*/
    public NettyClient(){
        log.info("begin init NettyClient");
        bootstrap=new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());
       /* this.serviceAddress=serviceAddress;
        this.servicePort=servicePort;*/
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception {
        ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName());
        ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync();
        future.addListener(listener->{
            if(future.isSuccess()){
                log.info("connect rpc server {} success.",serviceInfo.getServiceAddress());
            }else{
                log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress());
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();
            }
        });
        log.info("begin transfer data");
        future.channel().writeAndFlush(protocol);
    }
}

修改RpcInvokerProxy

將靜態ip和地址,修改成IRegistryService

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

   /* private String serviceAddress;
    private int servicePort;*/

    IRegistryService registryService;

    public RpcInvokerProxy(IRegistryService registryService) {
       /* this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;*/
        this.registryService=registryService;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        log.info("begin invoke target server");
        //組裝參數
        RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
        protocol.setHeader(header);
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParams(args);
        protocol.setContent(request);
        //發送請求
        NettyClient nettyClient=new NettyClient();
        //構建異步數據處理
        RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
        RequestHolder.REQUEST_MAP.put(requestId,future);
        nettyClient.sendRequest(protocol,this.registryService);
        return future.getPromise().get().getData();
    }
}

SpringRpcReferenceBean

修改引用bean,增加注冊中心配置

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;
    private Object object;
   /* private String serviceAddress;
    private int servicePort;*/
    //修改增加注冊中心
    private byte registryType;
    private String registryAddress;

    @Override
    public Object getObject() throws Exception {
        return object;
    }

    public void init(){
        //修改增加注冊中心
        IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType));
        this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
                new Class<?>[]{this.interfaceClass},
                new RpcInvokerProxy(registryService));
    }

    @Override
    public Class<?> getObjectType() {
        return this.interfaceClass;
    }

    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

   /* public void setServiceAddress(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public void setServicePort(int servicePort) {
        this.servicePort = servicePort;
    }*/

    public void setRegistryType(byte registryType) {
        this.registryType = registryType;
    }

    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }
}

SpringRpcReferencePostProcessor

@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private ApplicationContext context;
    private ClassLoader classLoader;
    private RpcClientProperties clientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {
        this.clientProperties = clientProperties;
    }

    //保存發布的引用bean信息
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader=classLoader;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context=applicationContext;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
            //遍歷bean定義,然后獲取到加載的bean,遍歷這些bean中的字段,是否攜帶GpRemoteReference注解
            //如果有,則需要構建一個動態代理實現
            BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
            String beanClassName=beanDefinition.getBeanClassName();
            if(beanClassName!=null){
                Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
                ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
            }
        }
        //將@GpRemoteReference注解的bean,構建一個動態代理對象
        BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{
            if(context.containsBean(beanName)){
                log.warn("SpringContext already register bean {}",beanName);
                return;
            }
            registry.registerBeanDefinition(beanName,beanDefinition);
            log.info("registered RpcReferenceBean {} success.",beanName);
        });
    }
    private void parseRpcReference(Field field){
        GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
        if(gpRemoteReference!=null) {
            BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass",field.getType());
            /*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort",clientProperties.getServicePort());*/
            builder.addPropertyValue("registryType",clientProperties.getRegistryType());
            builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress());
            BeanDefinition beanDefinition=builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
        }
    }
}

RpcRefernceAutoConfiguration

@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{

    @Bean
    public SpringRpcReferencePostProcessor postProcessor(){
        String address=environment.getProperty("gp.serviceAddress");
        int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
        RpcClientProperties rc=new RpcClientProperties();
        rc.setServiceAddress(address);
        rc.setServicePort(port);
        rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType")));
        rc.setRegistryAddress(environment.getProperty("gp.registryAddress"));
        return new SpringRpcReferencePostProcessor(rc);
    }

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment=environment;
    }
}

application.properties

修改netty-rpc-consumer模塊中的配置

gp.serviceAddress=192.168.1.102
gp.servicePort=20880

gp.registryType=0
gp.registryAddress=192.168.221.128:2181

負載均衡的測試

增加一個服務端的啟動類,並且修改端口。然后客戶端不需要重啟的情況下刷新瀏覽器,即可看到負載均衡的效果。

image-20210909202149527

圖7-5

需要源碼的同學,請關注公眾號[跟着Mic學架構],回復關鍵字[rpc],即可獲得

版權聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 4.0 許可協議。轉載請注明來自 Mic帶你學架構
如果本篇文章對您有幫助,還請幫忙點個關注和贊,您的堅持是我不斷創作的動力。歡迎關注「跟着Mic學架構」公眾號公眾號獲取更多技術干貨!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM