SpringBoot2+Netty打造通俗簡版RPC通信框架


2019-07-19:完成基本RPC通信!

2019-07-22:優化此框架,實現單一長連接!

2019-07-24:繼續優化此框架:1、增加服務提供注解(帶版本號),然后利用Spring框架的在啟動時立刻保存提供服務的實現類。2、優化NettyConfig(區分消費者和提供者配置),因為一個項目可同時作為服務提供者和服務消費者,所以增加兩個配置來區分是提供服務還是消費服務,而且,因為如果都是本地啟動着兩個項目,那么IP必定是一樣的,所以需要區分服務端口和消費端口。不然會有下面事故:先啟動client,再啟動server,但是他們同樣依賴於netty包,所以client也啟動了netty服務,只配置一個相同的端口會導致client的RPC通信也是通道自己啟動的Netty服務。。。

2019-07-27:優化此框架:增加注冊中心,使用Zookeeper作為注冊中心。

接下來:我會優化Netty方面的,例如增加心跳檢測、業務處理統一使用自定義業務線程池、客戶端或服務端異常斷開處理等,然后會優化一下項目的結構和rpc通信返回結果等,最后可能會考慮增加Redis作為注冊中心。等完成所有的這些,就會對整個項目重新寫一篇文章來介紹一下自己的整體思路,當然了,如果有同學需要的,可以在下方留言,我可以提前寫文章,對完成注冊中心及之前的代碼進行詳細介紹,之后再補充其他新增的功能實現過程!~

2019-07-30:已完成全部功能。放上連接:完整版RPC通信框架

下面的是2019-07-19寫的文章,所以代碼是沒經過優化的,不過是核心代碼,還是需要閱讀一下的,需要看完整代碼的請到最下面的github地址,大家可根據標簽拉到對應的代碼,麻煩啦然后還有,測試方法是HelloController的sayHello方法呢,也可以自己再搗鼓一些測試一下

​​​​

前段時間,我花了兩個星期的時間去重新學習Netty,因為之前總是看過一會就沒看了,所以今次下定決心一定要全部看完,然后也思考做了一些的思考題,並且將簡單的控制台版IM系統做出來了。雖然叫IM系統,但是是很簡陋的,哈哈,只有登錄、單聊、建群、加群、退群、群聊等簡單的功能。大家可以到我github上看看:Netty-IM

寫完這個IM系統后,我是打算自己寫一個網頁版的,可是考慮到自己前端的技能好像都退化得差不多了,而且時間上可能沒那么充裕,就不了了之了。然后有一天,突然想起來之前使用的RPC框架->Dubbo,他的通信底層就是使用Netty,那么我就想着要不自己先搞個簡單版試試唄,因為最主要的是學習技能得實踐一番,不然學了好像沒學一樣。。。

在開始動手前,自己屢了一下思路,也參考了兩篇文章,決定先做一個簡版的RPC框架,不帶注冊中心的那種。那么來了老弟,首先我們看一下整個流程圖是咋樣的:

接下來重頭戲來了,下面將會較詳細得說一下流程:

先簡單介紹一下項目結構:

simple-rpc-client:服務消費

simple-rpc-server:服務提供

simple-rpc-encapsulation:消費者和提供者公共接口

simple-rpc-netty:是關於Netty的東西,包括:自定義協議,序列化,通信實體Packet,各種Handler等等。

客戶端:
        1、首先是兩個注解,一個注解是:標識那些接口的調用會進行RPC通信,即@NettyRPC注解。
        另外一個注解是:告訴程序哪些包下的類會使用RPC通信,像@ComponentScan一樣,即@EnableNettyRPC注解。

/**
 * @author Howinfun
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NettyRPC {
}
/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableNettyRPC {
    //掃描的包名,如果為空,則根據啟動類所在的包名掃描
    String[] basePackages() default {};
}

2、因為我們使用@NettyRPC的將是一些接口,如果項目里頭沒有實現類,那是調用失敗的,那么我們可以通過實現ImportBeanDefinitionRegistrar和自定義FactoryBean和InvocationHandler,利用動態代理使接口有實現,並且能動態注入Bean。ImportBeanDefinitionRegistrar接口可以詳細說一下,因為這里是動態注入Bean,怎么注入規則是可以自定的,主要是靠ClassPathScanningCandidateComponentProvider這個類,它主要功能是掃描ClassPath下的所有類,並且根據isCandidateComponent方法來判斷哪些類可以作為候選人,當然了,isCandidateComponent方法你可以重寫,然后加上你自己的規則,我這里是必須是獨立的並且是接口,才能成為候選人。然后ClassPathScanningCandidateComponentProvider還能添加過濾器,我這里主要添加的過濾器是注解過濾器,只要帶有@NettyRPC注解的,其他的都不要。
    不過需要注意一點的是:記得在有@Configuration注解的配置類上使用@Import導入實現ImportBeanDefinitionRegistrar的類,不然實現動態注入Bean的作用,這里我們在客戶端的啟動類Import即可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.anno.EnableNettyRPC;
import com.hyf.rpc.netty.anno.NettyRPC;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;


/**
 * 自定義注冊帶@NettyRPC注解的接口,利用動態代理使接口有實現
 * 然后在有@Configuration注解的配置類上使用@Import導入,不然不能注入這些實現@NettyRPC接口的BeanDefinition
 * @author Howinfun
 * @date 2019-07-18
 */
public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {

    private ClassLoader classLoader;

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        ClassPathScanningCandidateComponentProvider scan = getScanner();

        //指定注解,類似於Feign注解,只掃描帶@NettyRPC注解的接口
        scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class));

        Set<BeanDefinition> candidateComponents = new HashSet<>();
        for (String basePackage : getBasePackages(importingClassMetadata)) {
            candidateComponents.addAll(scan.findCandidateComponents(basePackage));
        }
        candidateComponents.stream().forEach(beanDefinition -> {
            if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) {
                if (beanDefinition instanceof AnnotatedBeanDefinition) {
                    AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
                    AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
                    Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName());

                    this.registerNettyRpcClient(registry, annotationMetadata,attributes);
                }
            }
        });
    }

    private void registerNettyRpcClient(BeanDefinitionRegistry registry,
                                        AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        // 指定工廠,使用@NettyRPC注解的接口,當代碼中注入時,是從指定工廠獲取,而這里的工廠返回的是代理
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
                .genericBeanDefinition(NettyClientFactoryBean.class);
        // @Autowrie:根據類型注入
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
        // 注定type屬性
        definition.addPropertyValue("type", className);
        String name = attributes.get("name") == null ? "" :(String)(attributes.get("name"));
        // 別名
        String alias = name + "NettyRpcClient";
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        beanDefinition.setPrimary(true);
        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                new String[] { alias });
        // 注冊BeanDefinition
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }



    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false) {
            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                // 判斷候選人的條件:必須是獨立的,然后是接口
                if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){
                    return true;
                }
                return false;
            }
        };
    }

    /**
     * 獲取指定掃描@NettyRPC注解的包路徑
     * @param importingClassMetadata
     * @return
     */
    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
        Map<String, Object> attributes = importingClassMetadata
                .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName());

        Set<String> basePackages = new HashSet<>();
        // 如果指定的包路徑為空,則獲取啟動類當前路徑
        if (basePackages.isEmpty()) {
            basePackages.add(
                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));
        }else{
            for (String pkg : (String[]) attributes.get("basePackages")) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
        }
        return basePackages;
    }
}
package com.hyf.rpc.netty.client.config;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.stereotype.Component;

import java.lang.reflect.Proxy;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
public class NettyClientFactoryBean implements FactoryBean<Object> {

    private Class<?> type;

    @Override
    public Object getObject() throws Exception {
        // 這里的interfaces注意是就是type,因為我們現在是給接口做代理,千萬別寫type.getInterfaces(),不然啟動會報錯
        return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type));
    }

    @Override
    public Class<?> getObjectType() {
        return this.type;
    }
}

3、在動態代理的invoke方法里頭,我們將啟動Netty的一個客戶端,帶上接口調用的信息,然后等待Netty服務端返回結果結果再返回到前端即可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.client.NettyClient;
import com.hyf.rpc.netty.packet.RPCRequestPacket;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@NoArgsConstructor
@Component
public class NettyRPCInvocationHandler implements InvocationHandler {

    private Class<?> type;

    public NettyRPCInvocationHandler(Class<?> type){
        this.type = type;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RPCRequestPacket requestPacket = new RPCRequestPacket();
        requestPacket.setClazz(type);
        requestPacket.setMethodName(method.getName());
        requestPacket.setParamTypes(method.getParameterTypes());
        requestPacket.setParams(args);
        Object result = NettyClient.callRPC(requestPacket);
        return result;
    }
}

有一個坑是:當客戶端接收到服務端的返回結果后,記得關閉通道[ctx.channel().close()],因為在客戶端中RPC調用后是同步等待Channel關閉的,不然不能響應給前端。

服務端:服務端的流程稍微會簡單很多
        1、啟動Netty服務端服務,然后接收客戶端的鏈接請求,解析請求
        2、然后根據接口調用信息,利用反射獲取到實現類和對應的方法,最后調用方法得到結果,然后封裝一下結果就可以相應給客戶端了。

package com.hyf.rpc.netty.server.handler;

import com.hyf.rpc.netty.packet.RPCRequestPacket;
import com.hyf.rpc.netty.packet.RPCResponsePacket;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/16
 */
@ChannelHandler.Sharable
public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> {

    public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler();
    private RPCRequestPacketHandler(){}

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception {
        RPCResponsePacket responsePacket = new RPCResponsePacket();
        // 獲取rpc調用信息,利用反射執行方法,返回結果
        Class clazz = msg.getClazz();
        String methodName = msg.getMethodName();
        Object[] params = msg.getParams();
        Class[] paramTypes = msg.getParamTypes();
        // 掃面路徑下所有元數據
        Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl");
        Set<Class> subTypes = reflections.getSubTypesOf(clazz);
        if (subTypes.isEmpty()){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("沒有實現類");
        }else if (subTypes.size() > 1){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("多個實現類,無法判斷執行哪一個");
        }else{
            Class subClass = subTypes.toArray(new Class[1])[0];
            Method method = subClass.getMethod(methodName,paramTypes);
            Object result = method.invoke(subClass.newInstance(),params);
            responsePacket.setSuccess(true);
            responsePacket.setResult(result);
        }
        ctx.channel().writeAndFlush(responsePacket);
    }
}

3、這里的反射我推薦一個很好用的框架->Reflections。簡單介紹一下我使用了哪些API,首先是根據路徑掃描反射元數據,
    然后根據接口獲取它的所有實現類,然后就可以獲取實現類的反射信息,得到方法執行結果了。

如果同學們對此比較簡陋的代碼還略感興趣,可以到我的碼雲上看看:Netty-RPC


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM