yls 2020/5/23
netty 實現簡單rpc准備
- 使用netty傳輸java bean對象,可以使用protobuf,也可以通過json轉化
- 客戶端要將調用的接口名稱,方法名稱,參數列表的類型和值傳輸到服務端,
可以用動態代理 - 服務端要對接口和實現類進行映射(或者自定義名稱與實現類映射),接收到客戶端的數據,使用反射調用相關類的函數
- 客戶端使用callable返回調用的結果,先等待,有數據寫回后喚醒線程,賦值返回
基於netty編碼實現 rpc 調用
大致流程:
- netty搭建rpc框架;
- 創建服務消費者和服務提供者的公共接口和類
- 創建服務提供者,啟動netty框架的服務端
- 創建服務消費者,啟動netty框架的客戶端,然后獲取調用結果
1.首先用netty實現一個rpc框架
1.1 創建客戶端調用服務端時傳輸信息的類
/**
* rpc調用時傳輸類的信息
* 客戶端與服務端之間通信,傳遞信息的媒介
*/
public class ClassInfo {
//自定義name,一般一個接口有多個實現類的時候使用自定義
// 或者默認使用接口名稱
private String name;
private String methodName;
//參數類型
private Class[] types;
//參數列表
private Object[] params;
//自定義rpc協議
private String protocol="#rpc#";
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class[] getTypes() {
return types;
}
public void setTypes(Class<?>[] types) {
this.types = types;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
1.2 創建解決TCP粘包拆包的編解碼器
/**
* 編碼器
* MyMessageEncoder MyMessageDecoder解決粘包拆包問題
*/
public class MyMessageEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
//先發送內容長度
out.writeInt(msg.getBytes().length);
//發送具體的內容
out.writeBytes(msg.getBytes());
}
}
/**
* 解碼器
*/
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//先讀取要接收的字節長度
final int len = in.readInt();
final byte[] bytes = new byte[len];
//再根據長度讀取真正的字節數組
in.readBytes(bytes);
String s = new String(bytes);
out.add(s);
}
}
1.3 創建netty客戶端以及自定義的處理器
public class NettyClient {
private static NettyClientHandler nettyClientHandler;
static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
public static <T> T getBean(Class<T> service) {
String simpleName = service.getSimpleName();
return getBean(service, simpleName);
}
//獲取一個動態代理對象
public static <T> T getBean(Class<T> service, String name) {
T o = (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, ((proxy, method, args1) -> {
//先建立連接
if (nettyClientHandler == null) {
start(ClientBootStrap.getHost()
, ClientBootStrap.getPort());
}
//組裝傳輸類的屬性值
ClassInfo classInfo = new ClassInfo();
classInfo.setName(name);
classInfo.setMethodName(method.getName());
Class<?>[] parameterTypes = method.getParameterTypes();
classInfo.setTypes(parameterTypes);
classInfo.setParams(args1);
nettyClientHandler.setClassInfo(classInfo);
//運行線程,發送數據
Future future = threadPool.submit(nettyClientHandler);
//返回結果
String o1 = (String) future.get();
ObjectMapper objectMapper = new ObjectMapper();
//獲取返回類型,並將服務端返回的json數據轉化為對應的類型
Type returnType = method.getAnnotatedReturnType().getType();
Object o2 = objectMapper.readValue(o1, (Class<?>) returnType);
return o2;
}));
return o;
}
//啟動netty客戶端
public static void start(String host, int port) {
nettyClientHandler = new NettyClientHandler();
//客戶端需要一個事件循環組就可以
NioEventLoopGroup group = new NioEventLoopGroup(1);
try {
//創建客戶端的啟動對象 bootstrap ,不是 serverBootStrap
Bootstrap bootstrap = new Bootstrap();
//設置相關參數
bootstrap.group(group) //設置線程組
.channel(NioSocketChannel.class) //設置客戶端通道的實現數 (反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new MyMessageDecoder())
.addLast(new MyMessageEncoder())
.addLast(nettyClientHandler); //加入自己的處理器
}
});
System.out.println("客戶端 ready is ok..");
//連接服務器
final ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//對關閉通道進行監聽
// channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// group.shutdownGracefully();
}
}
}
/**
* 由於需要在 handler 中發送消息給服務端,並且將服務端返回的消息讀取后返回給消費者
* 所以實現了 Callable 接口,這樣可以運行有返回值的線程
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ClassInfo classInfo; //傳遞數據的類
private ChannelHandlerContext context;//上下文
private Object result;//服務端返回的結果
private Lock lock = new ReentrantLock();//使用鎖將 channelRead和 call 函數同步
private Condition condition = lock.newCondition();//精准喚醒 call中的等待
public void setClassInfo(ClassInfo classInfo) {
this.classInfo = classInfo;
}
//通道連接時,就將上下文保存下來,因為這樣其他函數也可以用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive 被調用。。。");
}
//當服務端返回消息時,將消息復制到類變量中,然后喚醒正在等待結果的線程,返回結果
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lock.lock();
System.out.println(ctx.channel().hashCode());
System.out.println("收到服務端發送的消息 " + msg);
result = msg;
//喚醒等待的線程
condition.signal();
lock.unlock();
}
//這里面發送數據到服務端,等待channelRead方法接收到返回的數據時,將數據返回給服務消費者
@Override
public Object call() throws Exception {
lock.lock();
ObjectMapper objectMapper = new ObjectMapper();
final String s = objectMapper.writeValueAsString(classInfo);
context.writeAndFlush(s);
System.out.println("發出數據 " + s);
//向服務端發送消息后等待channelRead中接收到消息后喚醒
condition.await();
lock.unlock();
return result;
}
//異常處理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
1.4 創建netty服務端以及自定義的處理器
public class NettyServer {
//啟動netty服務端
public static void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創建服務端的啟動對象,並使用鏈式編程來設置參數
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 作為服務器的通道實現
.option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列的連接個數
.childOption(ChannelOption.SO_KEEPALIVE, true) //設置一直保持活動連接狀態
.childHandler(new ChannelInitializer<SocketChannel>() {//設置一個通道測試對象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//給pipeline設置通道處理器
ch.pipeline()
.addLast(new MyMessageDecoder())
.addLast(new MyMessageEncoder())
.addLast(new NettyServerHandler());
}
});//給 workerGroup 的EventLoop對應的管道設置處理器
//啟動服務器,並綁定端口並且同步
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//給 channelFuture 注冊監聽器,監聽關心的事件,異步的時候使用
// channelFuture.addListener((future) -> {
// if (future.isSuccess()) {
// System.out.println("監聽端口成功。。。");
// } else {
// System.out.println("監聽端口失敗。。。");
// }
// });
//對關閉通道進行監聽,監聽到通道關閉后,往下執行
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public static Map<String, Class<?>> classNameMapping = new HashMap();
public static void setClassNameMapping(Object object) {
Class<?> clazz = object.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
Class<?> anInterface = interfaces[0];
setClassNameMapping(anInterface.getSimpleName(), object);
}
//為實現類定義標識,方便客戶端和服務端通信調用
public static void setClassNameMapping(String name, Object object) {
Class<?> clazz = object.getClass();
classNameMapping.put(name, clazz);
}
//接收客戶端傳入的值,將值解析為類對象,獲取其中的屬性,然后反射調用實現類的方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = (String) msg;
System.out.println("接收到數據 " + s);
ObjectMapper objectMapper = new ObjectMapper();
ClassInfo classInfo = objectMapper.readValue(s, ClassInfo.class);
//確認是rpc調用才往下執行
if(classInfo!=null && "#rpc#".equals(classInfo.getProtocol())){
//反射調用實現類的方法
String name = classInfo.getName();
//獲取指定名稱的實現類
Class<?> aClass = classNameMapping.get(name);
Object o = aClass.newInstance();
if (classInfo.getTypes().length > 0) {
Method method = aClass.getMethod(classInfo.getMethodName(), classInfo.getTypes());
method.setAccessible(true);
Object invoke = method.invoke(o, classInfo.getParams());
String s1 = objectMapper.writeValueAsString(invoke);
ctx.writeAndFlush(s1);
} else {
Method method = aClass.getMethod(classInfo.getMethodName());
method.setAccessible(true);
Object invoke = method.invoke(o);
String s1 = objectMapper.writeValueAsString(invoke);
ctx.writeAndFlush(s1);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2.創建服務消費者和服務提供者的公共接口和類
public interface HelloService {
Result hello(String s);
String str();
}
/**
* 測試返回結果為java bean時使用的類
*/
public class Result {
private int id;
private String content;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
3.創建服務提供者
3.1 服務提供者實現公共接口
public class HelloServiceImpl implements HelloService {
@Override
public Result hello(String s) {
System.out.println("收到消費者的請求。。" + s);
Result result=new Result();
result.setId(1);
result.setContent("你好,我已經收到了你的消費請求");
return result;
}
@Override
public String str() {
return "我是一個字符串。。。";
}
}
3.2 啟動netty框架的服務端
public class ServerBootStrap {
public static void main(String[] args) {
NettyServerHandler.setClassNameMapping(new HelloServiceImpl());
NettyServer.start(9999);
}
}
4.創建服務消費者,啟動netty框架的客戶端,然后獲取調用結果
/**
* 消費者
*/
public class ClientBootStrap {
private static String host = "127.0.0.1";
private static int port = 9999;
public static String getHost() {
return host;
}
public static int getPort() {
return port;
}
public static void main(String[] args) {
//連接netty,並獲得一個代理對象
HelloService bean = NettyClient.getBean(HelloService.class);
//測試返回結果為java bean
Result res = bean.hello("ffafa");
System.out.println("res=====" + res.getContent());
//測試返回結果為 String
String str = bean.str();
System.out.println("str=====" + str);
}
}