關於Netty,就不過多的介紹了。如果你重來沒有接觸過Netty,可以看一看這篇文章:
https://www.jianshu.com/p/b9f3f6a16911
首先,我們需要抽象出一個引導類,這個類的作用是引導服務的啟動或者引導客戶端的鏈接發起。這是一個基類,可以繼承它實現各自的需求。
/** * * 基礎的引導類 * @author Lv 2018/1/20 */ public class HBootstrap { public static final String ATTR_ID = "id"; public static final String BOOTSTRAP = "bootstrap"; /** * boss */ protected EventLoopGroup bossGroup; /** * work */ protected EventLoopGroup workGroup; protected HServer server; public HBootstrap() { } /** * 需要先設置這兩個值,再調用start * @param bossGroup * @param workGroup */ public HBootstrap setEventLoopGroup(EventLoopGroup bossGroup, EventLoopGroup workGroup) { this.bossGroup = bossGroup; this.workGroup = workGroup; return this; } public HServer getServer() { return server; } public void setServer(HServer server) { this.server = server; } public void start() { } public void destroy() { } public int getId() { return 0; } }
這個類比較簡單,所有的字段只有兩個線程池(一個Boss 和 一個Worker) 和 HServer。線程池將會在外部構建這個類的時候注入進來,因為我們想讓我所的HBootstrap 共用相同的線程池。另外在同一個進程里面可能會開多個端口(Server身份)或者需要發起多個Socket鏈接(Client的身份),這個時候在一個進程里面就需要開啟個多HBootstrap。HServer 就是他們的管理者。
package tales.network.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; public class HServer { /** * boss 線程池 */ protected EventLoopGroup boss; /** * worker 線程池 */ protected EventLoopGroup worker; /** * 是否已經在運行 */ protected volatile boolean isRun = false; /** * 所有的HBootstrap */ protected Map<Integer,HBootstrap> bootstrapMap = new HashMap<>(); protected static Logger logger = LoggerFactory.getLogger(HServer.class); /** * 初始化線程池數量 * @param bossCount * @param workerCount */ public synchronized void initEventLoopGroup(int bossCount,int workerCount) { if(boss != null && worker != null) return; if(bossCount > 0) { boss = new NioEventLoopGroup(bossCount); } worker = new NioEventLoopGroup(workerCount); } /** * 也可以通過外部注入進來 * @param boss * @param worker */ public void group(EventLoopGroup boss,EventLoopGroup worker) { this.boss = boss; this.worker = worker; } public <T> T getAttributeId(ChannelHandlerContext ctx) { AttributeKey<T> key = AttributeKey.valueOf(HBootstrap.ATTR_ID); return ctx.channel().attr(key).get(); } public synchronized void addBootstrap(HBootstrap bootstrap) { if(!bootstrapMap.containsKey(bootstrap.getId())) { bootstrapMap.put(bootstrap.getId(),bootstrap); } bootstrap.server = this; if(isRun) { bootstrap.start(); } logger.info("add bootstrap:" + bootstrap.getId()); } public synchronized void removeBootstrap(long id) { HBootstrap bootstrap = bootstrapMap.remove(id); if(bootstrap != null) { bootstrap.server = null; bootstrap.destroy(); } logger.info("remove bootstrap:",id); } /** * 遍歷HBootstrap * @param consumer */ public synchronized void each(Consumer<HBootstrap> consumer) { for (Map.Entry<Integer, HBootstrap> entry : bootstrapMap.entrySet()) { consumer.accept(entry.getValue()); } } /** * 開始啟動 */ public synchronized void start() { if(this.isRun) return; this.isRun = true; for (HBootstrap bootstrap : bootstrapMap.values()) { bootstrap.setEventLoopGroup(boss,worker).start(); } } /** * 銷毀 */ public synchronized void destroy() { if(!this.isRun) return; this.isRun = false; for (HBootstrap bootstrap : bootstrapMap.values()) { bootstrap.destroy(); } if (this.boss != null) { this.boss.shutdownGracefully(); this.boss = null; } if(this.worker != null) { this.worker.shutdownGracefully(); this.worker = null; } } }
基本上就是這樣。通過initEventLoopGroup 初始化線程數量,如果bossCount填0的話表示沒有boos線程池,這種一般用做純客戶端實現。接下來,就可以封裝HServerBootstrap 和 HSocketBootstrap。
package tales.network.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 服務器引導 * Created by Lv on 2016/6/12. */ public abstract class HServerBootstrap extends HBootstrap { protected static Logger logger = LoggerFactory.getLogger(HServerBootstrap.class); private ServerBootstrap bootstrap; private ChannelFuture future; /** * id號 */ protected int id; /** * 端口 */ protected int port; /** * ip */ protected String host; public HServerBootstrap() { this.bootstrap = new ServerBootstrap(); this.bootstrap.childAttr(AttributeKey.valueOf(ATTR_ID),this.getId()); this.bootstrap.childAttr(AttributeKey.valueOf(BOOTSTRAP),this); } public void setId(int id) { this.id = id; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public void setPort(int port) { this.port = port; } public <T> HServerBootstrap addAttribute(String name, T value) { bootstrap.childAttr(AttributeKey.valueOf(name),value); return this; } @Override public void start() { bootstrap.group(this.bossGroup,this.workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { onInitChildChannel(ch,bootstrap); } }); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.SO_REUSEADDR,true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { this.future = this.bootstrap.bind(this.host,this.port).sync(); logger.info("創建socket監聽成功=> " + this.host + ":" + this.port); } catch (InterruptedException e) { logger.error("創建socket監聽出錯=> " + this.host + ":" + this.port); logger.error("",e); } } protected abstract void onInitChildChannel(Channel ch, ServerBootstrap bootstrap); public void destroy() { try { future.channel().close(); future.get(); } catch (Exception e) { logger.error("",e); } } public int getPort() { return port; } @Override public int getId() { return id; } }
基本的邏輯都在start里面。需要注入host 和 端口,然后start方法會去listen相應的端口。另外有一個abstract方法(onInitChildChannel)用於子類重寫,綁定相應的包解碼器。下面是HSocketBootstrap
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; /** * Socket鏈接器 */ public abstract class HSocketBootstrap extends HBootstrap { protected static Logger logger = LoggerFactory.getLogger(HSocketBootstrap.class); protected final AtomicBoolean isConnecting; protected Bootstrap bootstrap; protected String host; protected int port; protected ChannelFuture future; public HSocketBootstrap() { this.bootstrap = new Bootstrap(); this.bootstrap.attr(AttributeKey.valueOf(ATTR_ID),this.getId()); this.bootstrap.attr(AttributeKey.valueOf(BOOTSTRAP),this); this.isConnecting = new AtomicBoolean(false); } public <T> HSocketBootstrap addAttribute(String name, T value) { bootstrap.attr(AttributeKey.valueOf(name),value); return this; } @Override public void start() { bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.group(workGroup); bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { onInitChannel(ch); } }); this.connect(); } public HSocketBootstrap connect() { if(isConnecting.compareAndSet(false,true)){ ChannelFuture connect = bootstrap.connect(host, port); connect.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { connect.removeListener(this); isConnecting.compareAndSet(true,false); logger.info("connect to server=>{}:{} complete=>{}",host,port,future.isSuccess()); } }); this.future = connect; } return this; } protected abstract void onInitChannel(Channel ch); public void destroy() { try { future.channel().close(); future.get(); } catch (Exception e) { logger.error("",e); } } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
明天開始寫解碼器嘍~