demo地址
https://gitee.com/syher/grave-netty
RPC介紹
首先了解一下RPC:遠程過程調用。簡單點說就是本地應用可以調用遠程服務器的接口。那么通過什么方式調用遠程接口呢?說白了RPC只是一種概念。他的調用可以基於HTTP實現,也可以基於TCP/IP實現。甚至私人定制的通訊協議。
當然,私人定制通訊協議成本過高且不具備通用性。我們不做展開討論(其實我也展不開。。。)。那為什么不使用HTTP協議呢?受限於HTTP協議層級過高,數據傳輸效率不如TCP/IP。所以RPC遠程調用一般采用TCP/IP實現。即調用socket方法。
RPC實現原理
1. 客戶端發起遠程服務調用。
2. 客戶端將類信息、調用方法和入參信息通過socket通道發送給服務端。
3. 服務端解析數據包,調用本地接口。
5.將執行結果通過socket返回給客戶端。
6.客戶端拿到並解析返回結果。
RPC實現
java如何實現一個rpc框架,其實就是按照上面的原理再做一些詳細的補充。比如通過動態代理封裝客戶端的數據包、通過反射機制實現服務端實現類的調用等等。
今天,我們先基於spring boot + netty 做rpc服務端的實現。
首先,做一個注解用於標識接口提供rpc調用。
1
2
3
4
5
|
@Target
(ElementType.TYPE)
@Retention
(RetentionPolicy.RUNTIME)
public
@interface
Service {
String name()
default
""
;
}
|
該注解用於提供服務的實現類上。
1
2
3
4
|
public
interface
INettyService {
String getString();
}
|
其實現類:
1
2
3
4
5
6
7
8
9
|
package
com.braska.grave.netty.server.service;
@Service
// 該注解為自定義rpc服務注解
public
class
NettyService
implements
INettyService {
@Override
public
String getString() {
return
"welcome to use netty rpc."
;
}
}
|
接着,定義一個注解用來掃描指定包名下的Service注解。
1
2
3
4
5
6
7
8
|
@Retention
(RetentionPolicy.RUNTIME)
@Target
({ElementType.TYPE})
@Documented
@Import
({NettyServerScannerRegistrar.
class
, NettyServerApplicationContextAware.
class
})
public
@interface
NettyServerScan {
String[] basePackages();
}
|
該注解用於spring boot啟動類上,參數basePackages指定服務所在的包路徑。
1
2
3
4
5
6
7
8
9
10
11
|
@SpringBootApplication
@NettyServerScan
(basePackages = {
"com.braska.grave.netty.server.service"
})
public
class
GraveNettyServerApplication {
public
static
void
main(String[] args) {
SpringApplication.run(GraveNettyServerApplication.
class
, args);
}
}
|
NettyServerScannerRegistrar類處理服務的spring bean注冊。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
class
NettyServerScannerRegistrar
implements
BeanFactoryAware, ImportBeanDefinitionRegistrar, ResourceLoaderAware {
@Override
public
void
registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// 創建掃描器實例
NettyServerInterfaceScanner scanner =
new
NettyServerInterfaceScanner(registry);
if
(
this
.resourceLoader !=
null
) {
scanner.setResourceLoader(
this
.resourceLoader);
}
AnnotationAttributes annoAttrs =
AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyServerScan.
class
.getName()));
List<String> basePackages =
new
ArrayList<String>();
for
(String pkg : annoAttrs.getStringArray(
"basePackages"
)) {
if
(StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
// 只掃描指定的注解。
scanner.setAnnotationClass(Service.
class
);
scanner.registerFilters();
// 將basePackages里面的通過@Service注解的類注冊成spring bean。
scanner.doScan(StringUtils.toStringArray(basePackages));
}
}
|
NettyServerApplicationContextAware類,暴露socket server端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public
class
NettyServerApplicationContextAware
implements
ApplicationContextAware, InitializingBean {
private
static
final
Logger logger = Logger.getLogger(NettyServerApplicationContextAware.
class
.getName());
// 存儲接口與實現類的映射,其中key是接口名。value是實現類的bean。
private
Map<String, Object> serviceMap =
new
HashMap<>();
// 服務worker。包含netty socket服務端生命周期及讀寫。
ServerWorker runner;
@Override
public
void
setApplicationContext(ApplicationContext applicationContext)
throws
BeansException {
String address = applicationContext.getEnvironment().getProperty(
"remoteAddress"
);
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Service.
class
);
for
(Object serviceBean : beans.values()) {
Class<?> clazz = serviceBean.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
for
(Class<?> inter : interfaces) {
String interfaceName = inter.getName();
serviceMap.put(interfaceName, serviceBean);
}
}
// 創建netty worker對象
runner =
new
ServerWorker(address, serviceMap);
}
@Override
public
void
afterPropertiesSet()
throws
Exception {
// 創建netty socketServer及通道處理器
runner.open();
}
}
|
ServerWorker類的open方法。
NettyServerHandler服務端channel處理器,繼承ChannelInboundHandlerAdapter。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@ChannelHandler
.Sharable
public
class
NettyServerHandler
extends
ChannelInboundHandlerAdapter {
private
Map<String, Object> serviceMap;
public
NettyServerHandler(Map<String, Object> serviceMap) {
this
.serviceMap = serviceMap;
}
@Override
public
void
channelRead(ChannelHandlerContext ctx, Object msg) {
// 解析客戶端發送過來的數據。包含類名、方法名、入參等信息。
Request request = JSON.parseObject(msg.toString(), Request.
class
);
Response response =
new
Response();
response.setRequestId(request.getId());
try
{
// 調用本地實現類
Object res =
this
.handler(request);
response.setData(res);
}
catch
(Exception e) {
response.setCode(-
1
);
response.setError(e.getMessage());
logger.log(Level.SEVERE,
"請求調用失敗"
, e);
}
// 返回處理結果給客戶端
ctx.writeAndFlush(response);
}
private
Object handler(Request request)
throws
Exception {
String className = request.getClassName();
// 通過className從beanMap映射中找到托管給spring的bean實現類。
Object serviceBean = serviceMap.get(className);
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
// 通過反射機制調用實現類。並返回調用結果。
return
MethodUtils.invokeMethod(serviceBean, methodName, parameters);
}
}
|
至此,rpc服務端的實現就完成了。
一路看下來,服務端的代碼實現還是比較簡單的。核心代碼只有兩個類:ServerWorker和NettyServerHandler。其余的都是對spring bean注冊的支持。