添加依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.2.Final</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
組織架構

服務端
封裝類信息
public class ClassInfo implements Serializable {
private static final long serialVersionUID = 1L;
private String className; //類名
private String methodName;//方法名
private Class<?>[] types; //參數類型
private Object[] objects;//參數列表
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypes() {
return types;
}
public void setTypes(Class<?>[] types) {
this.types = types;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
}
服務端網絡處理服務器
public class NettyRPCServer {
private int port;
public NettyRPCServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(port).childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//編碼器
pipeline.addLast("encoder", new ObjectEncoder());
//解碼器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//服務器端業務處理類
pipeline.addLast(new InvokeHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("......server is ready......");
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyRPCServer(9999).start();
}
}
服務器端業務處理類
public class InvokeHandler extends ChannelInboundHandlerAdapter {
//得到某接口下某個實現類的名字
private String getImplClassName(ClassInfo classInfo) throws Exception{
//服務方接口和實現類所在的包路徑
String interfacePath="com.lyz.server";
int lastDot = classInfo.getClassName().lastIndexOf(".");
String interfaceName=classInfo.getClassName().substring(lastDot);
Class superClass=Class.forName(interfacePath+interfaceName);
Reflections reflections = new Reflections(interfacePath);
//得到某接口下的所有實現類
Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
if(ImplClassSet.size()==0){
System.out.println("未找到實現類");
return null;
}else if(ImplClassSet.size()>1){
System.out.println("找到多個實現類,未明確使用哪一個");
return null;
}else {
//把集合轉換為數組
Class[] classes=ImplClassSet.toArray(new Class[0]);
return classes[0].getName(); //得到實現類的名字
}
}
@Override //讀取客戶端發來的數據並通過反射調用實現類的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo) msg;
System.out.println(classInfo);
Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
//通過反射調用實現類的方法
Object result = method.invoke(clazz, classInfo.getObjects());
ctx.writeAndFlush(result);
}
}
服務端接口及實現類
// 無參接口
public interface HelloNetty {
String hello();
}
// 實現類
public class HelloNettyImpl implements HelloNetty {
@Override
public String hello() {
return "hello,netty";
}
}
// 帶參接口
public interface HelloRPC {
String hello(String name);
}
// 實現類
public class HelloRPCImpl implements HelloRPC {
@Override
public String hello(String name) {
return "hello," + name;
}
}
客戶端
代理類
public class NettyRPCProxy {
//根據接口創建代理對象
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
//封裝ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
//開始用Netty發送數據
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//編碼器
pipeline.addLast("encoder", new ObjectEncoder());
//解碼器 構造方法第一個參數設置二進制數據的最大字節數 第二個參數設置具體使用哪個類解析器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//客戶端業務處理類
pipeline.addLast("handler", resultHandler);
}
});
ChannelFuture future = b.connect("127.0.0.1", 9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
客戶端業務處理類
public class ResultHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override //讀取服務器端返回的數據(遠程調用的結果)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
客戶端接口
// 無參接口
public interface HelloNetty {
String hello();
}
// 帶參接口
public interface HelloRPC {
String hello(String name);
}
測試類 服務調用方
public class TestNettyRPC {
public static void main(String [] args){
//第1次遠程調用
HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);
System.out.println(helloNetty.hello());
//第2次遠程調用
HelloRPC helloRPC = (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
System.out.println(helloRPC.hello("RPC"));
}
}
輸出結果
服務端
......server is ready...... com.lyz.serverStub.ClassInfo@2b894733 com.lyz.serverStub.ClassInfo@167bfa9
客戶端
hello,netty hello,RPC
下一篇通過netty實現線上聊天功能
