关于Netty,就不过多的介绍了。如果你重来没有接触过Netty,可以看一看这篇文章:
https://www.jianshu.com/p/b9f3f6a16911
首先,我们需要抽象出一个引导类,这个类的作用是引导服务的启动或者引导客户端的链接发起。这是一个基类,可以继承它实现各自的需求。
/** * * 基础的引导类 * @author Lv 2018/1/20 */ public class HBootstrap { public static final String ATTR_ID = "id"; public static final String BOOTSTRAP = "bootstrap"; /** * boss */ protected EventLoopGroup bossGroup; /** * work */ protected EventLoopGroup workGroup; protected HServer server; public HBootstrap() { } /** * 需要先设置这两个值,再调用start * @param bossGroup * @param workGroup */ public HBootstrap setEventLoopGroup(EventLoopGroup bossGroup, EventLoopGroup workGroup) { this.bossGroup = bossGroup; this.workGroup = workGroup; return this; } public HServer getServer() { return server; } public void setServer(HServer server) { this.server = server; } public void start() { } public void destroy() { } public int getId() { return 0; } }
这个类比较简单,所有的字段只有两个线程池(一个Boss 和 一个Worker) 和 HServer。线程池将会在外部构建这个类的时候注入进来,因为我们想让我所的HBootstrap 共用相同的线程池。另外在同一个进程里面可能会开多个端口(Server身份)或者需要发起多个Socket链接(Client的身份),这个时候在一个进程里面就需要开启个多HBootstrap。HServer 就是他们的管理者。
package tales.network.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; public class HServer { /** * boss 线程池 */ protected EventLoopGroup boss; /** * worker 线程池 */ protected EventLoopGroup worker; /** * 是否已经在运行 */ protected volatile boolean isRun = false; /** * 所有的HBootstrap */ protected Map<Integer,HBootstrap> bootstrapMap = new HashMap<>(); protected static Logger logger = LoggerFactory.getLogger(HServer.class); /** * 初始化线程池数量 * @param bossCount * @param workerCount */ public synchronized void initEventLoopGroup(int bossCount,int workerCount) { if(boss != null && worker != null) return; if(bossCount > 0) { boss = new NioEventLoopGroup(bossCount); } worker = new NioEventLoopGroup(workerCount); } /** * 也可以通过外部注入进来 * @param boss * @param worker */ public void group(EventLoopGroup boss,EventLoopGroup worker) { this.boss = boss; this.worker = worker; } public <T> T getAttributeId(ChannelHandlerContext ctx) { AttributeKey<T> key = AttributeKey.valueOf(HBootstrap.ATTR_ID); return ctx.channel().attr(key).get(); } public synchronized void addBootstrap(HBootstrap bootstrap) { if(!bootstrapMap.containsKey(bootstrap.getId())) { bootstrapMap.put(bootstrap.getId(),bootstrap); } bootstrap.server = this; if(isRun) { bootstrap.start(); } logger.info("add bootstrap:" + bootstrap.getId()); } public synchronized void removeBootstrap(long id) { HBootstrap bootstrap = bootstrapMap.remove(id); if(bootstrap != null) { bootstrap.server = null; bootstrap.destroy(); } logger.info("remove bootstrap:",id); } /** * 遍历HBootstrap * @param consumer */ public synchronized void each(Consumer<HBootstrap> consumer) { for (Map.Entry<Integer, HBootstrap> entry : bootstrapMap.entrySet()) { consumer.accept(entry.getValue()); } } /** * 开始启动 */ public synchronized void start() { if(this.isRun) return; this.isRun = true; for (HBootstrap bootstrap : bootstrapMap.values()) { bootstrap.setEventLoopGroup(boss,worker).start(); } } /** * 销毁 */ public synchronized void destroy() { if(!this.isRun) return; this.isRun = false; for (HBootstrap bootstrap : bootstrapMap.values()) { bootstrap.destroy(); } if (this.boss != null) { this.boss.shutdownGracefully(); this.boss = null; } if(this.worker != null) { this.worker.shutdownGracefully(); this.worker = null; } } }
基本上就是这样。通过initEventLoopGroup 初始化线程数量,如果bossCount填0的话表示没有boos线程池,这种一般用做纯客户端实现。接下来,就可以封装HServerBootstrap 和 HSocketBootstrap。
package tales.network.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 服务器引导 * Created by Lv on 2016/6/12. */ public abstract class HServerBootstrap extends HBootstrap { protected static Logger logger = LoggerFactory.getLogger(HServerBootstrap.class); private ServerBootstrap bootstrap; private ChannelFuture future; /** * id号 */ protected int id; /** * 端口 */ protected int port; /** * ip */ protected String host; public HServerBootstrap() { this.bootstrap = new ServerBootstrap(); this.bootstrap.childAttr(AttributeKey.valueOf(ATTR_ID),this.getId()); this.bootstrap.childAttr(AttributeKey.valueOf(BOOTSTRAP),this); } public void setId(int id) { this.id = id; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public void setPort(int port) { this.port = port; } public <T> HServerBootstrap addAttribute(String name, T value) { bootstrap.childAttr(AttributeKey.valueOf(name),value); return this; } @Override public void start() { bootstrap.group(this.bossGroup,this.workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { onInitChildChannel(ch,bootstrap); } }); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.SO_REUSEADDR,true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { this.future = this.bootstrap.bind(this.host,this.port).sync(); logger.info("创建socket监听成功=> " + this.host + ":" + this.port); } catch (InterruptedException e) { logger.error("创建socket监听出错=> " + this.host + ":" + this.port); logger.error("",e); } } protected abstract void onInitChildChannel(Channel ch, ServerBootstrap bootstrap); public void destroy() { try { future.channel().close(); future.get(); } catch (Exception e) { logger.error("",e); } } public int getPort() { return port; } @Override public int getId() { return id; } }
基本的逻辑都在start里面。需要注入host 和 端口,然后start方法会去listen相应的端口。另外有一个abstract方法(onInitChildChannel)用于子类重写,绑定相应的包解码器。下面是HSocketBootstrap
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; /** * Socket链接器 */ public abstract class HSocketBootstrap extends HBootstrap { protected static Logger logger = LoggerFactory.getLogger(HSocketBootstrap.class); protected final AtomicBoolean isConnecting; protected Bootstrap bootstrap; protected String host; protected int port; protected ChannelFuture future; public HSocketBootstrap() { this.bootstrap = new Bootstrap(); this.bootstrap.attr(AttributeKey.valueOf(ATTR_ID),this.getId()); this.bootstrap.attr(AttributeKey.valueOf(BOOTSTRAP),this); this.isConnecting = new AtomicBoolean(false); } public <T> HSocketBootstrap addAttribute(String name, T value) { bootstrap.attr(AttributeKey.valueOf(name),value); return this; } @Override public void start() { bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.group(workGroup); bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { onInitChannel(ch); } }); this.connect(); } public HSocketBootstrap connect() { if(isConnecting.compareAndSet(false,true)){ ChannelFuture connect = bootstrap.connect(host, port); connect.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { connect.removeListener(this); isConnecting.compareAndSet(true,false); logger.info("connect to server=>{}:{} complete=>{}",host,port,future.isSuccess()); } }); this.future = connect; } return this; } protected abstract void onInitChannel(Channel ch); public void destroy() { try { future.channel().close(); future.get(); } catch (Exception e) { logger.error("",e); } } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
明天开始写解码器喽~