2019-07-19:完成基本RPC通信!
2019-07-22:優化此框架,實現單一長連接!
2019-07-24:繼續優化此框架:1、增加服務提供注解(帶版本號),然后利用Spring框架的在啟動時立刻保存提供服務的實現類。2、優化NettyConfig(區分消費者和提供者配置),因為一個項目可同時作為服務提供者和服務消費者,所以增加兩個配置來區分是提供服務還是消費服務,而且,因為如果都是本地啟動着兩個項目,那么IP必定是一樣的,所以需要區分服務端口和消費端口。不然會有下面事故:先啟動client,再啟動server,但是他們同樣依賴於netty包,所以client也啟動了netty服務,只配置一個相同的端口會導致client的RPC通信也是通道自己啟動的Netty服務。。。
2019-07-27:優化此框架:增加注冊中心,使用Zookeeper作為注冊中心。
接下來:我會優化Netty方面的,例如增加心跳檢測、業務處理統一使用自定義業務線程池、客戶端或服務端異常斷開處理等,然后會優化一下項目的結構和rpc通信返回結果等,最后可能會考慮增加Redis作為注冊中心。等完成所有的這些,就會對整個項目重新寫一篇文章來介紹一下自己的整體思路,當然了,如果有同學需要的,可以在下方留言,我可以提前寫文章,對完成注冊中心及之前的代碼進行詳細介紹,之后再補充其他新增的功能實現過程!~
2019-07-30:已完成全部功能。放上連接:完整版RPC通信框架
下面的是2019-07-19寫的文章,所以代碼是沒經過優化的,不過是核心代碼,還是需要閱讀一下的,需要看完整代碼的請到最下面的github地址,大家可根據標簽拉到對應的代碼,麻煩啦然后還有,測試方法是HelloController的sayHello方法呢,也可以自己再搗鼓一些測試一下
前段時間,我花了兩個星期的時間去重新學習Netty,因為之前總是看過一會就沒看了,所以今次下定決心一定要全部看完,然后也思考做了一些的思考題,並且將簡單的控制台版IM系統做出來了。雖然叫IM系統,但是是很簡陋的,哈哈,只有登錄、單聊、建群、加群、退群、群聊等簡單的功能。大家可以到我github上看看:Netty-IM
寫完這個IM系統后,我是打算自己寫一個網頁版的,可是考慮到自己前端的技能好像都退化得差不多了,而且時間上可能沒那么充裕,就不了了之了。然后有一天,突然想起來之前使用的RPC框架->Dubbo,他的通信底層就是使用Netty,那么我就想着要不自己先搞個簡單版試試唄,因為最主要的是學習技能得實踐一番,不然學了好像沒學一樣。。。
在開始動手前,自己屢了一下思路,也參考了兩篇文章,決定先做一個簡版的RPC框架,不帶注冊中心的那種。那么來了老弟,首先我們看一下整個流程圖是咋樣的:
接下來重頭戲來了,下面將會較詳細得說一下流程:
先簡單介紹一下項目結構:
simple-rpc-client:服務消費
simple-rpc-server:服務提供
simple-rpc-encapsulation:消費者和提供者公共接口
simple-rpc-netty:是關於Netty的東西,包括:自定義協議,序列化,通信實體Packet,各種Handler等等。
客戶端:
1、首先是兩個注解,一個注解是:標識那些接口的調用會進行RPC通信,即@NettyRPC注解。
另外一個注解是:告訴程序哪些包下的類會使用RPC通信,像@ComponentScan一樣,即@EnableNettyRPC注解。
/**
* @author Howinfun
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NettyRPC {
}
/**
* @author Howinfun
* @desc
* @date 2019/7/15
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableNettyRPC {
//掃描的包名,如果為空,則根據啟動類所在的包名掃描
String[] basePackages() default {};
}
2、因為我們使用@NettyRPC的將是一些接口,如果項目里頭沒有實現類,那是調用失敗的,那么我們可以通過實現ImportBeanDefinitionRegistrar和自定義FactoryBean和InvocationHandler,利用動態代理使接口有實現,並且能動態注入Bean。ImportBeanDefinitionRegistrar接口可以詳細說一下,因為這里是動態注入Bean,怎么注入規則是可以自定的,主要是靠ClassPathScanningCandidateComponentProvider這個類,它主要功能是掃描ClassPath下的所有類,並且根據isCandidateComponent方法來判斷哪些類可以作為候選人,當然了,isCandidateComponent方法你可以重寫,然后加上你自己的規則,我這里是必須是獨立的並且是接口,才能成為候選人。然后ClassPathScanningCandidateComponentProvider還能添加過濾器,我這里主要添加的過濾器是注解過濾器,只要帶有@NettyRPC注解的,其他的都不要。
不過需要注意一點的是:記得在有@Configuration注解的配置類上使用@Import導入實現ImportBeanDefinitionRegistrar的類,不然實現動態注入Bean的作用,這里我們在客戶端的啟動類Import即可。
package com.hyf.rpc.netty.client.config;
import com.hyf.rpc.netty.anno.EnableNettyRPC;
import com.hyf.rpc.netty.anno.NettyRPC;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* 自定義注冊帶@NettyRPC注解的接口,利用動態代理使接口有實現
* 然后在有@Configuration注解的配置類上使用@Import導入,不然不能注入這些實現@NettyRPC接口的BeanDefinition
* @author Howinfun
* @date 2019-07-18
*/
public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {
private ClassLoader classLoader;
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider scan = getScanner();
//指定注解,類似於Feign注解,只掃描帶@NettyRPC注解的接口
scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class));
Set<BeanDefinition> candidateComponents = new HashSet<>();
for (String basePackage : getBasePackages(importingClassMetadata)) {
candidateComponents.addAll(scan.findCandidateComponents(basePackage));
}
candidateComponents.stream().forEach(beanDefinition -> {
if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) {
if (beanDefinition instanceof AnnotatedBeanDefinition) {
AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName());
this.registerNettyRpcClient(registry, annotationMetadata,attributes);
}
}
});
}
private void registerNettyRpcClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
// 指定工廠,使用@NettyRPC注解的接口,當代碼中注入時,是從指定工廠獲取,而這里的工廠返回的是代理
BeanDefinitionBuilder definition = BeanDefinitionBuilder
.genericBeanDefinition(NettyClientFactoryBean.class);
// @Autowrie:根據類型注入
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
// 注定type屬性
definition.addPropertyValue("type", className);
String name = attributes.get("name") == null ? "" :(String)(attributes.get("name"));
// 別名
String alias = name + "NettyRpcClient";
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setPrimary(true);
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
new String[] { alias });
// 注冊BeanDefinition
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false) {
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
// 判斷候選人的條件:必須是獨立的,然后是接口
if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){
return true;
}
return false;
}
};
}
/**
* 獲取指定掃描@NettyRPC注解的包路徑
* @param importingClassMetadata
* @return
*/
protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
Map<String, Object> attributes = importingClassMetadata
.getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName());
Set<String> basePackages = new HashSet<>();
// 如果指定的包路徑為空,則獲取啟動類當前路徑
if (basePackages.isEmpty()) {
basePackages.add(
ClassUtils.getPackageName(importingClassMetadata.getClassName()));
}else{
for (String pkg : (String[]) attributes.get("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
}
return basePackages;
}
}
package com.hyf.rpc.netty.client.config;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.stereotype.Component;
import java.lang.reflect.Proxy;
/**
* @author Howinfun
* @desc
* @date 2019/7/15
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
public class NettyClientFactoryBean implements FactoryBean<Object> {
private Class<?> type;
@Override
public Object getObject() throws Exception {
// 這里的interfaces注意是就是type,因為我們現在是給接口做代理,千萬別寫type.getInterfaces(),不然啟動會報錯
return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type));
}
@Override
public Class<?> getObjectType() {
return this.type;
}
}
3、在動態代理的invoke方法里頭,我們將啟動Netty的一個客戶端,帶上接口調用的信息,然后等待Netty服務端返回結果結果再返回到前端即可。
package com.hyf.rpc.netty.client.config;
import com.hyf.rpc.netty.client.NettyClient;
import com.hyf.rpc.netty.packet.RPCRequestPacket;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* @author Howinfun
* @desc
* @date 2019/7/15
*/
@NoArgsConstructor
@Component
public class NettyRPCInvocationHandler implements InvocationHandler {
private Class<?> type;
public NettyRPCInvocationHandler(Class<?> type){
this.type = type;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RPCRequestPacket requestPacket = new RPCRequestPacket();
requestPacket.setClazz(type);
requestPacket.setMethodName(method.getName());
requestPacket.setParamTypes(method.getParameterTypes());
requestPacket.setParams(args);
Object result = NettyClient.callRPC(requestPacket);
return result;
}
}
有一個坑是:當客戶端接收到服務端的返回結果后,記得關閉通道[ctx.channel().close()],因為在客戶端中RPC調用后是同步等待Channel關閉的,不然不能響應給前端。
服務端:服務端的流程稍微會簡單很多
1、啟動Netty服務端服務,然后接收客戶端的鏈接請求,解析請求
2、然后根據接口調用信息,利用反射獲取到實現類和對應的方法,最后調用方法得到結果,然后封裝一下結果就可以相應給客戶端了。
package com.hyf.rpc.netty.server.handler;
import com.hyf.rpc.netty.packet.RPCRequestPacket;
import com.hyf.rpc.netty.packet.RPCResponsePacket;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.reflections.Reflections;
import java.lang.reflect.Method;
import java.util.Set;
/**
* @author Howinfun
* @desc
* @date 2019/7/16
*/
@ChannelHandler.Sharable
public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> {
public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler();
private RPCRequestPacketHandler(){}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception {
RPCResponsePacket responsePacket = new RPCResponsePacket();
// 獲取rpc調用信息,利用反射執行方法,返回結果
Class clazz = msg.getClazz();
String methodName = msg.getMethodName();
Object[] params = msg.getParams();
Class[] paramTypes = msg.getParamTypes();
// 掃面路徑下所有元數據
Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl");
Set<Class> subTypes = reflections.getSubTypesOf(clazz);
if (subTypes.isEmpty()){
responsePacket.setSuccess(false);
responsePacket.setMsg("沒有實現類");
}else if (subTypes.size() > 1){
responsePacket.setSuccess(false);
responsePacket.setMsg("多個實現類,無法判斷執行哪一個");
}else{
Class subClass = subTypes.toArray(new Class[1])[0];
Method method = subClass.getMethod(methodName,paramTypes);
Object result = method.invoke(subClass.newInstance(),params);
responsePacket.setSuccess(true);
responsePacket.setResult(result);
}
ctx.channel().writeAndFlush(responsePacket);
}
}
3、這里的反射我推薦一個很好用的框架->Reflections。簡單介紹一下我使用了哪些API,首先是根據路徑掃描反射元數據,
然后根據接口獲取它的所有實現類,然后就可以獲取實現類的反射信息,得到方法執行結果了。
如果同學們對此比較簡陋的代碼還略感興趣,可以到我的碼雲上看看: