目標:通過使用Netty框架實現RPC(遠程過程調用協議),技術儲備為以后實現分布式服務框架做技術儲備。在這里實現自定義協議主要實現遠程方法調用。
技術分析:
1.通過Java的反射技術我們可以獲取對象的屬性以及調用指定的方法所以,只要指定對象的名字以及所對應的方法名和參數值以及參數類型我們就可以實現動他的調用對象。
2.通過Netty我們可以實現數據的NIO(非阻塞異步傳輸)高並發高效率傳遞。
3.通過代理(JDK或CGLIb)來實現動態代理。
代碼實現:
一 jdk動態代理實現
思路:在這里考慮到代理方式的多樣所以用抽象的代理工廠。
AbstractProxyFactory.java
public abstract class AbstractProxyFactory { // public static final String JDK_PROXY_NAME = "jdkProxy"; public static final String JDK_PROXY_NAME = "com.jewel.proxy.JdkProxyFactory"; public static AbstractProxyFactory create(String name) { AbstractProxyFactory apf = null; try { apf = (AbstractProxyFactory) Class.forName(name).newInstance(); } catch (Exception e) { e.printStackTrace(); } /*if (name.equals(JDK_PROXY_NAME)) { return new JdkProxyFactory(); }*/ return apf; } public abstract Object proxy(String interfaceName, String version, AbstractLoadCluster abstractLoadCluster); public abstract Object proxy(String interfaceName, String version); }
實現jdk動態代理
JdkProxyFactory.java
public class JdkProxyFactory extends AbstractProxyFactory { public Object proxy(String interfaceName, String version, AbstractLoadCluster abstractLoadCluster) { try { return Proxy.newProxyInstance(Class.forName(interfaceName) .getClassLoader(), new Class<?>[] { Class .forName(interfaceName) }, new JewelInvocationHandler( version, interfaceName, abstractLoadCluster)); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } public Object proxy(String interfaceName, String version) { try { return Proxy.newProxyInstance(Class.forName(interfaceName) .getClassLoader(), new Class<?>[] { Class .forName(interfaceName) }, new JewelInvocationHandler( version, interfaceName)); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } }
二 netty傳輸搭建
思路:通過使用自定義事件來監聽數據傳輸層狀態,通過使用protostuff來對數據編解碼。
2.1 編解碼
protostuff幫助類:SerializationUtil.java
public class SerializationUtil {
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(Object obj, Class<T> clazz) {
Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(clazz);
LinkedBuffer buffer = LinkedBuffer.allocate(4096);
byte[] protostuff = null;
try {
protostuff = ProtostuffIOUtil.toByteArray((T)obj, schema, buffer);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
buffer.clear();
}
return protostuff;
}
public static <T> T deserialize(byte[] buff, Class<T> clazz) {
T object = null;
try {
object = (T) clazz.newInstance();
Schema<T> schema = RuntimeSchema.getSchema(clazz);
ProtostuffIOUtil.mergeFrom(buff, object, schema);
} catch (InstantiationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return object;
}
編碼:RPCEncoder.java
public class RPCEncoder extends MessageToByteEncoder {
private Class<?> clazz;
public RPCEncoder(Class<?> clazz) {
this.clazz = clazz;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (clazz.isInstance(msg)) {
// 序列化
byte[] buff = SerializationUtil.serialize(msg, clazz);
out.writeInt(buff.length);
out.writeBytes(buff);
}
}
}
解碼:RPCDecoder.java
public class RPCDecoder extends ByteToMessageDecoder {
private Class<?> clazz;
public RPCDecoder(Class<?> clazz) {
this.clazz = clazz;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object object = null;
//反序列化
object = SerializationUtil.deserialize(data, clazz);
out.add(object);
}
}
2.2 傳輸層狀態事件監聽
StateEvent.java
public class StateEvent {
public String clientId;// 客戶端編號,和jewel協議層一致
private boolean isActice = true;// 通道是可用
private boolean isClose = false;// 連接關閉不可用
private boolean isValid = false;// 通道是空閑的
public boolean isClose() {
return isClose;
}
public void setValid(boolean isValid) {
this.isValid = isValid;
}
public boolean isValid() {
return this.isValid;
}
public void setClose(boolean isClose) {
this.isClose = isClose;
}
private boolean reConnect;
public void setReConnect(boolean reConnect) {
this.reConnect = reConnect;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getClientId() {
return clientId;
}
public boolean isReConnect() {
return reConnect;
}
public boolean isActive() {
return isActice;
}
public void setActice(boolean isActice) {
this.isActice = isActice;
}
@Override
public String toString() {
return "StateEvent [clientId=" + clientId + ", isActice=" + isActice + ", isClose=" + isClose + ", isValid=" + isValid + ", reConnect="
+ reConnect + "]";
}
}
ProtocolActiveListener.java
/**
*
* @Description: netty客戶端連接狀態監聽器類
* @author maybo
* @date 2016年5月13日 下午3:12:24
*
*/
public interface ProtocolActiveListener {
/**
*
* @Description: TODO(事件方法,客戶端連接狀態)
* @param @param stateEvent 狀態類
* @return void 返回類型
*/
public void clientStateEvent(StateEvent stateEvent);
}
2.3 Server層
RPCServer.java
public class RPCServer { private static final Logger LOGGER = LoggerFactory.getLogger(RPCServer.class); public static String IP = "127.0.0.1"; public static int PORT = 9998; public static int READ_IDLE_TIME = 60;// 讀空閑時間 public static int WRITE_IDLE_TIME = 30;// 寫空閑時間 public static int ALL_IDLE_TIME = 10;// 讀寫空閑時間 public static int TIMEOUT_SECONDS = 300;// /** 用於分配處理業務線程的線程組個數 */ protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2; // 默認 /** 業務出現線程大小 */ protected static final int BIZTHREADSIZE = 4; /* * NioEventLoopGroup實際上就是個線程池, * NioEventLoopGroup在后台啟動了n個NioEventLoop來處理Channel事件, * 每一個NioEventLoop負責處理m個Channel, * NioEventLoopGroup從NioEventLoop數組里挨個取出NioEventLoop來處理Channel */ private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); public static void run() throws Exception { LOGGER.info("----------------------------------------開啟服務端連接--IP:"+IP+PORT+"------------------------"); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME, WRITE_IDLE_TIME, ALL_IDLE_TIME, TimeUnit.SECONDS)); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast(new RPCDecoder(RPCRequest.class)); pipeline.addLast(new RPCEncoder(RPCResponse.class)); pipeline.addLast(new RPCServerHandler()); } }); b.bind(IP, PORT).sync(); LOGGER.info("----------------------------------------開啟服務端連接成功-------------------------"); } protected static void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { RPCServer.run(); } }
RPCServerHandler.java
public class RPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> { private static FutureTaskService taskService = FutureTaskService.newInstance(); private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override protected void messageReceived(ChannelHandlerContext ctx, RPCRequest request) { if (request.getType() != 1) {// 調用服務 try { logger.info("Channel"+ctx.channel().id().asShortText()+"-------------"+"From:"+"jewel:"+ctx.channel().localAddress()+"/"+request.getInterfaceName()+"/"+request.getMethodName()); taskService.invoke(request, ctx.channel().id().asShortText(), ctx); } catch (InterruptedException e) { RPCResponse response = new RPCResponse(); response.setDate(new Date()); response.setError(new RuntimeException()); response.setResponseId(request.getRequestId()); response.setState("505"); if (ctx.channel().isActive()) { ctx.channel().writeAndFlush(response); } e.printStackTrace(); } catch (ExecutionException e) { RPCResponse response = new RPCResponse(); response.setDate(new Date()); response.setError(new RuntimeException()); response.setResponseId(request.getRequestId()); response.setState("510"); if (ctx.channel().isActive()) { ctx.channel().writeAndFlush(response); } e.printStackTrace(); } } } /** * 一段時間未進行讀寫操作 回調 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // TODO Auto-generated method stub super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) {// 超時關閉通道 // 超時關閉channel ctx.close(); } else if (event.state().equals(IdleState.WRITER_IDLE)) { // 寫超時 } else if (event.state().equals(IdleState.ALL_IDLE)) {// 心跳檢測 // 未進行讀寫 RPCResponse response = new RPCResponse(); response.setType(1); if (ctx.channel().isActive()) { ctx.channel().writeAndFlush(response); } } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { super.exceptionCaught(ctx, e); if (e instanceof IOException) { ctx.channel().close(); } e.printStackTrace(); } }
2.4 Client層
RPCClient.java
public class RPCClient {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private String id;// 編號
public static String HOST = "127.0.0.1";
public static int PORT = 9999;
public static int TIMEOUT__SECONDS = 120;// 超時時間
public static int CONNECT_TIMEOUT_SECONDS = 3;// 連接超時時間
public static int RECONN_TIME_SECONDS = 3;// 重新建立連接時間
private ChannelFuture channelFuture;
private int connectAmount = 0;
private RPCClient client = null;
private Bootstrap bootstrap = null;
private EventLoopGroup group;
public static Map<String, Object> responseMap = new ConcurrentHashMap<String, Object>();
public ProtocolActiveListener activeListener = null;
public ProtocolActiveListener getActiveListener() {
return activeListener;
}
public RPCClient(String id) {
this.id = id;
}
/**
* 初始化Bootstrap
*
* @return
*/
private Bootstrap getBootstrap() {
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_SECONDS * 1000);
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));// 防止丟包
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast(new RPCEncoder(RPCRequest.class));// 編碼
pipeline.addLast(new RPCDecoder(RPCResponse.class));// 轉碼
pipeline.addLast(new RPCClientHandler(responseMap, client));// 處理
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}
/**
*
* @Description: TODO(初始化建立連接)
* @param @param id
* @param @return 設定文件
* @return RPCClient 返回類型
*/
public static RPCClient init(String id) {
RPCClient client = new RPCClient(id);
client.client = client;
client.bootstrap = client.getBootstrap();
return client;
}
public RPCClient getClient() {
return client;
}
public boolean isActive() {
return client.getChannelFuture().channel().isActive();
}
public RPCClient doConnection() throws InterruptedException {
client.connection(HOST, PORT);
return client;
}
/**
*
* @Description: TODO(連接)
* @param @param host
* @param @param port
* @param @return 設定文件
* @return RPCClient 返回類型
* @throws InterruptedException
*/
public RPCClient connection(String host, int port) throws InterruptedException {
channelFuture = client.bootstrap.connect(host, port).sync();
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {// 重連機制
if (future.isSuccess()) {
// 連接成功連接指數為0
connectAmount = 0;
if (null != activeListener) {// 接聽客戶狀態
StateEvent stateEvent = new StateEvent();
stateEvent.setActice(true);
stateEvent.setClientId(id);
stateEvent.setClose(false);
stateEvent.setReConnect(true);
activeListener.clientStateEvent(stateEvent);
}
logger.info("客戶端連接成功。");
} else {
connectAmount++;
if (connectAmount == 3) {// 連接數大於3次停止連接
connectAmount = 0;
shutdown();// 關閉連接
} else {
future.channel().eventLoop().schedule(new Runnable() {
public void run() {
try {
doConnection();
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("------------重新連接服務器失敗--------");
}
}
}, RECONN_TIME_SECONDS, TimeUnit.SECONDS);
if (null != activeListener) {// 接聽客戶狀態
StateEvent stateEvent = new StateEvent();
stateEvent.setActice(false);
stateEvent.setClientId(id);
stateEvent.setClose(false);
stateEvent.setReConnect(false);
activeListener.clientStateEvent(stateEvent);
}
}
}
}
});
return client;
}
public ChannelFuture getChannelFuture() {
return channelFuture;
}
/**
*
* @Description: TODO(發送消息)
* @param @param request
* @param @return
* @param @throws Exception 設定文件
* @return RPCResponse 返回類型
*/
public RPCResponse sendMsg(RPCRequest request) throws Exception {
if (null == request) {
throw new NullPointerException();
}
try {
RPCResponse response = new RPCResponse();
responseMap.put(request.getRequestId(), response);
if (channelFuture.channel().isOpen() && channelFuture.channel().isActive()) {
if (null != this.activeListener) {// 發送監聽通道
StateEvent stateEvent = new StateEvent();
stateEvent.setActice(true);
stateEvent.setClientId(this.id);
stateEvent.setClose(false);
stateEvent.setReConnect(false);
stateEvent.setValid(false);
activeListener.clientStateEvent(stateEvent);
}
channelFuture.channel().writeAndFlush(request);
synchronized (response) {
if (null == response.getResponseId()) {
response.wait(TIMEOUT__SECONDS * 1000);
} else {
response.notifyAll();
}
if (null == response.getResponseId()) {
channelFuture.channel().close();
response = new RPCResponse();
response.setDate(new Date());
response.setResponseId(request.getRequestId());
response.setState("305");
response.setError(new RequestTimeOutException("請求超時"));
this.shutdown();// 關閉連接
} else {
if (null != this.activeListener) {// 發送監聽通道
StateEvent stateEvent = new StateEvent();
stateEvent.setActice(true);
stateEvent.setClientId(this.id);
stateEvent.setClose(false);
stateEvent.setReConnect(false);
stateEvent.setValid(true);
activeListener.clientStateEvent(stateEvent);
}
response = (RPCResponse) responseMap.get(request.getRequestId());
System.out.println(response.toString());
responseMap.remove(request.getRequestId());
}
}
}
return response;
} finally {
}
}
public void shutdown() throws InterruptedException {
channelFuture.channel().close();
group.shutdownGracefully();
if (null != activeListener) {// 接聽客戶狀態
StateEvent stateEvent = new StateEvent();
stateEvent.setActice(false);
stateEvent.setClientId(this.id);
stateEvent.setClose(true);
activeListener.clientStateEvent(stateEvent);
}
logger.info("客戶端關閉連接。");
}
public void addActiveListener(ProtocolActiveListener listener) {
this.activeListener = listener;
}
public String id() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
RPCClientHandler.java
public class RPCClientHandler extends SimpleChannelInboundHandler<RPCResponse> {
private RPCClient client = null;
private Logger logger = LoggerFactory.getLogger(RPCClientHandler.class);
public void setClient(RPCClient client) {
this.client = client;
}
private Map<String, Object> responseMap;
public RPCClientHandler(Map<String, Object> responseMap) {
this.responseMap = responseMap;
}
public RPCClientHandler(Map<String, Object> responseMap, RPCClient client) {
this.responseMap = responseMap;
this.client = client;
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, RPCResponse response) throws Exception {
if (response.getType() == 1) {// 發送心跳回文
if (null != this.client.getActiveListener()) {// 監聽通道空閑
StateEvent stateEvent = new StateEvent();
stateEvent.setActice(true);
stateEvent.setClientId(this.client.id());
stateEvent.setClose(false);
stateEvent.setReConnect(false);
stateEvent.setValid(true);
this.client.getActiveListener().clientStateEvent(stateEvent);
}
RPCRequest request = new RPCRequest();
request.setType(1);
ctx.channel().writeAndFlush(request);
} else {// 接收消息
RPCResponse rpcResponse = (RPCResponse) responseMap.get(response.getResponseId());
synchronized (rpcResponse) {
BeanUtils.copyProperties(rpcResponse, response);
rpcResponse.notifyAll();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
e.printStackTrace();
super.exceptionCaught(ctx, e);
if (e instanceof IOException) {// 發生異常關閉通道
ctx.channel().close();
client.shutdown();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// 重新連接服務器
ctx.channel().eventLoop().schedule(new Runnable() {
public void run() {
try {
client.doConnection();
} catch (InterruptedException e) {
logger.error("斷開連接后重新建立連接出現異常."+e.getMessage());
e.printStackTrace();
}
}
}, RPCClient.RECONN_TIME_SECONDS, TimeUnit.SECONDS);
}
}
三 反射實現方法調用
RequestMethodInvoker.java
public class RequestMethodInvoker { public RPCResponse invoke(RPCRequest request) { if (null == request) { throw new NullPointerException(); } RPCResponse response = new RPCResponse(); response.setResponseId(request.getRequestId()); response.setDate(new Date()); String interfaceName = request.getInterfaceName();// 接口名字 if (null == interfaceName || interfaceName.length() <= 0) {// 接口名為空 response.setState("400"); response.setError(new NullPointerException("接口名字不可以為空")); } else { Object object = null; if (null != request.getVersion() && request.getVersion().length() > 0) {// 存在版本號 object = ContextBeanUtil.getBean(interfaceName, request.getVersion());// 獲取服務對象 } else { object = ContextBeanUtil.getBean(interfaceName);// 獲取服務對象 } Object result; try { if (null == object) { response.setError(new NotFindServiceException("沒有找到服務實現類異常")); response.setState("405"); } else { result = MethodInvokeUtil.invoke(object, request.getParams(), request.getMethodName(), request.getParamTypes()); response.setResult(result); response.setState("200"); } } catch (IllegalAccessException e) { response.setState("410"); response.setError(e); } catch (IllegalArgumentException e) { response.setState("415"); response.setError(e); } catch (InvocationTargetException e) { response.setState("420"); response.setError(e); } catch (NoSuchMethodException e) { response.setState("425"); response.setError(e); } catch (SecurityException e) { response.setState("430"); response.setError(e); } catch (ClassNotFoundException e) { response.setState("435"); response.setError(e); } } return response; } }
總結:通過以上方式實現RPC功能。
