上篇讲了RPC服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。
这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。
demo地址
https://gitee.com/syher/grave-netty
RPC实现
同样定义注解扫描service接口。
1
2
3
4
5
6
7
8
9
10
|
@Retention
(RetentionPolicy.RUNTIME)
@Target
({ElementType.TYPE})
@Documented
@Import
({NettyClientScannerRegistrar.
class
, NettyClientApplicationContextAware.
class
})
public
@interface
NettyClientScan {
String[] basePackages();
Class<?
extends
NettyFactoryBean> factoryBean()
default
NettyFactoryBean.
class
;
}
|
该注解用于spring boot启动类上,参数basePackages指定接口所在的包路径。
1
2
3
4
5
6
7
8
9
10
11
|
@SpringBootApplication
@NettyClientScan
(basePackages = {
"com.braska.grave.netty.api.service"
})
public
class
GraveNettyClientApplication {
public
static
void
main(String[] args) {
SpringApplication.run(GraveNettyClientApplication.
class
, args);
}
}
|
NettyServerScannerRegistrar类注册bean。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
class
NettyClientScannerRegistrar
implements
ImportBeanDefinitionRegistrar, ResourceLoaderAware {
@Override
public
void
registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// spring bean注册
NettyClientInterfaceScanner scanner =
new
NettyClientInterfaceScanner(registry);
AnnotationAttributes annoAttrs =
AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyClientScan.
class
.getName()));
Class<?
extends
NettyFactoryBean> nettyFactoryBeanClass = annoAttrs.getClass(
"factoryBean"
);
if
(!NettyFactoryBean.
class
.equals(nettyFactoryBeanClass)) {
scanner.setNettyFactoryBean(BeanUtils.instantiateClass(nettyFactoryBeanClass));
}
List<String> basePackages =
new
ArrayList<String>();
for
(String pkg : annoAttrs.getStringArray(
"basePackages"
)) {
if
(StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
scanner.doScan(StringUtils.toStringArray(basePackages));
}
}
|
NettyClientInterfaceScanner类使用jdk动态代理basePackages路径下的接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public
class
NettyClientInterfaceScanner
extends
ClassPathBeanDefinitionScanner {
private
NettyFactoryBean nettyFactoryBean =
new
NettyFactoryBean();
@Override
public
Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitions =
super
.doScan(basePackages);
if
(beanDefinitions.isEmpty()) {
}
else
{
processBeanDefinitions(beanDefinitions);
}
return
beanDefinitions;
}
private
void
processBeanDefinitions(
Set<BeanDefinitionHolder> beanDefinitions) {
GenericBeanDefinition definition;
for
(BeanDefinitionHolder holder : beanDefinitions) {
definition = (GenericBeanDefinition) holder.getBeanDefinition();
// 为对象属性赋值(这一块我也还不太明白)
definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
// 这里的nettyFactoryBean是生成Bean实例的工厂,不是Bean本身
definition.setBeanClass(
this
.nettyFactoryBean.getClass());
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
}
}
}
|
NettyFactoryBean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
class
NettyFactoryBean<T>
implements
FactoryBean<T> {
private
Class<T> nettyInterface;
public
NettyFactoryBean() {}
public
NettyFactoryBean(Class<T> nettyInterface) {
this
.nettyInterface = nettyInterface;
}
@Override
public
T getObject()
throws
Exception {
// 通过jdk动态代理创建实例
return
(T) Proxy.newProxyInstance(nettyInterface.getClassLoader(),
new
Class[]{nettyInterface}, c.getInstance());
}
@Override
public
Class<?> getObjectType() {
return
this
.nettyInterface;
}
@Override
public
boolean
isSingleton() {
return
true
;
}
}
|
关键来了,NettyInterfaceInvoker类负责数据包封装及发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public
class
NettyInterfaceInvoker
implements
InvocationHandler {
private
RequestSender sender;
// 静态内部类做单例模式
private
static
class
SINGLETON {
private
static
final
NettyInterfaceInvoker invoker =
new
NettyInterfaceInvoker();
private
static
NettyInterfaceInvoker setSender(RequestSender sender) {
invoker.sender = sender;
return
invoker;
}
}
public
static
NettyInterfaceInvoker getInstance() {
return
SINGLETON.invoker;
}
public
static
NettyInterfaceInvoker setSender(RequestSender sender) {
return
SINGLETON.setSender(sender);
}
@Override
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
// 数据包封装,包含类名、方法名及参数等信息。
Request request =
new
Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterTypes(method.getParameterTypes());
request.setId(UUID.randomUUID().toString());
// 数据发送
Object result = sender.send(request);
Class<?> returnType = method.getReturnType();
// 处理返回数据
Response response = JSON.parseObject(result.toString(), Response.
class
);
if
(response.getCode() ==
1
) {
throw
new
Exception(response.getError());
}
if
(returnType.isPrimitive() || String.
class
.isAssignableFrom(returnType)) {
return
response.getData();
}
else
if
(Collection.
class
.isAssignableFrom(returnType)) {
return
JSONArray.parseArray(response.getData().toString(), Object.
class
);
}
else
if
(Map.
class
.isAssignableFrom(returnType)) {
return
JSON.parseObject(response.getData().toString(), Map.
class
);
}
else
{
Object data = response.getData();
return
JSONObject.parseObject(data.toString(), returnType);
}
}
}
|
接着我们来看看RequestSender怎么处理数据的。
1
2
3
4
5
|
public
interface
RequestSender {
Channel connect(SocketAddress address)
throws
InterruptedException;
Object send(Request request)
throws
InterruptedException;
}
|
RequestSender本身只是一个接口。他的实现类有:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
public
class
NettyClientApplicationContextAware
extends
ChannelInitializer<SocketChannel>
implements
RequestSender, ApplicationContextAware, InitializingBean {
private
static
final
Logger logger = Logger.getLogger(NettyClientApplicationContextAware.
class
.getName());
private
String remoteAddress;
private
Bootstrap bootstrap;
private
EventLoopGroup group;
private
NettyChannelManager manager;
private
NettyClientHandler handler;
@Override
public
void
setApplicationContext(ApplicationContext applicationContext)
throws
BeansException {
this
.remoteAddress = applicationContext.getEnvironment().getProperty(
"remoteAddress"
);
this
.bootstrap =
new
Bootstrap();
this
.group =
new
NioEventLoopGroup(
1
);
this
.bootstrap.group(group).
channel(NioSocketChannel.
class
).
option(ChannelOption.TCP_NODELAY,
true
).
option(ChannelOption.SO_KEEPALIVE,
true
).
handler(
this
);
this
.manager =
new
NettyChannelManager(
this
);
this
.handler =
new
NettyClientHandler(manager, remoteAddress);
}
@Override
public
void
afterPropertiesSet()
throws
Exception {
// socket连接入口。
this
.manager.refresh(Lists.newArrayList(remoteAddress));
}
@Override
public
Object send(Request request)
throws
InterruptedException {
Channel channel = manager.take();
if
(channel !=
null
&& channel.isActive()) {
SynchronousQueue<Object> queue =
this
.handler.sendRequest(request, channel);
Object result = queue.take();
return
JSONArray.toJSONString(result);
}
else
{
Response res =
new
Response();
res.setCode(
1
);
res.setError(
"未正确连接到服务器.请检查相关配置信息!"
);
return
JSONArray.toJSONString(res);
}
}
@Override
protected
void
initChannel(SocketChannel channel)
throws
Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(
new
IdleStateHandler(
0
,
0
,
30
));
pipeline.addLast(
new
JSONEncoder());
pipeline.addLast(
new
JSONDecoder());
// 管道处理器
pipeline.addLast(
this
.handler);
}
@Override
public
Channel connect(SocketAddress address)
throws
InterruptedException {
ChannelFuture future = bootstrap.connect(address);
// 建立长连接,提供失败重连。
future.addListener(
new
ConnectionListener(
this
.manager,
this
.remoteAddress));
Channel channel = future.channel();
//future.sync().channel();
return
channel;
}
public
void
destroy() {
this
.group.shutdownGracefully();
}
}
|
NettyClientHandler类处理管道事件。与服务端不通,这个管道处理器是继承ChannelInboundHandlerAdapter类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
@ChannelHandler
.Sharable
public
class
NettyClientHandler
extends
ChannelInboundHandlerAdapter {
private
static
final
Logger logger = Logger.getLogger(NettyServerHandler.
class
.getName());
private
ConcurrentHashMap<String, SynchronousQueue<Object>> queueMap =
new
ConcurrentHashMap<>();
private
NettyChannelManager manager;
private
String remoteAddress;
public
NettyClientHandler(NettyChannelManager manager, String remoteAddress) {
this
.manager = manager;
this
.remoteAddress = remoteAddress;
}
@Override
public
void
channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
logger.info(
"与netty服务器断开连接."
+ address);
ctx.channel().close();
manager.remove(ctx.channel());
// 掉线重连
final
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
manager.refresh(Lists.newArrayList(remoteAddress));
}, 1L, TimeUnit.SECONDS);
}
@Override
public
void
channelRead(ChannelHandlerContext ctx, Object msg)
throws
Exception {
// 处理服务端返回的数据
Response response = JSON.parseObject(msg.toString(), Response.
class
);
String requestId = response.getRequestId();
SynchronousQueue<Object> queue = queueMap.get(requestId);
queue.put(response);
queueMap.remove(requestId);
}
public
SynchronousQueue<Object> sendRequest(Request request, Channel channel) {
// 使用阻塞队列处理客户端请求
SynchronousQueue<Object> queue =
new
SynchronousQueue<>();
queueMap.put(request.getId(), queue);
channel.writeAndFlush(request);
return
queue;
}
public
void
userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws
Exception {
logger.info(
"发送心跳消息..."
);
if
(evt
instanceof
IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if
(event.state() == IdleState.ALL_IDLE) {
Request request =
new
Request();
request.setMethodName(
"heartBeat"
);
ctx.channel().writeAndFlush(request);
}
}
else
{
super
.userEventTriggered(ctx, evt);
}
}
}
|
这样,RPC的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。
结合上篇RPC服务端。一个完整的RPC框架就搭建完了。
当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。