設計模式在Netty 中的應用(回顧):
單例模式要點回顧:
- 一個類在任何情況下只有一個對象,並提供一個全局訪問點。
- 可延遲創建。
- 避免線程安全問題。
在我們利用netty自帶的容器來管理客戶端鏈接的NIOSocketChannel的時候我們會利用public static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);來管理,這里就有單例的應用,而對於單例的線程安全模式最簡單的就是餓漢式。如下,當然在Netty中有很多地方都會應用到單例,這里只是舉類說明:
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { ...... public static final GlobalEventExecutor INSTANCE; static { SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1L); INSTANCE = new GlobalEventExecutor(); } ...... }
策略模式要點回顧:
- 封裝一系列可相互替換的算法家族。
- 動態選擇某一個策略。
在我們的NioEventLoopGroup初始化的時候,在其中創建了一個指定大小的EventExecutor數組,而選擇這個執行的過程正式利用了策略模式,而這個策略根據該數組大小是否是二次冪來決定:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } public EventExecutorChooser newChooser(EventExecutor[] executors) { return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) :
new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors)); } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } public EventExecutor next() { return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)]; } } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } public EventExecutor next() { return this.executors[this.idx.getAndIncrement() & this.executors.length - 1]; } } }
裝飾者模式要點回顧:
- 裝飾者和被裝飾者實現同一個接口。
- 裝飾者通常繼承被裝飾者,同宗同源。
- 動態修改、重載被裝飾者的方法。
這是在一個不可釋放的Buf中的例子:
class WrappedByteBuf extends ByteBuf { protected final ByteBuf buf; protected WrappedByteBuf(ByteBuf buf) { if (buf == null) { throw new NullPointerException("buf"); } else { this.buf = buf; } } ...... }
final class UnreleasableByteBuf extends WrappedByteBuf {
private SwappedByteBuf swappedBuf;
UnreleasableByteBuf(ByteBuf buf) {
super(buf);
}
......
public boolean release() {
return false;
}
public boolean release(int decrement) {
return false;
}
}
觀察者模式要點回顧:
- 兩個角色:觀察者和被觀察者。
- 觀察者訂閱消息,被觀察者發布消息。
- 訂閱則能收到消息,取消訂閱則收不到。
這個例子是channel.writeAndFlush()方法:我們可以通過添加觀察者來監聽消息發送的結果,結果會被保存到ChannelFuture中:
future.channel().writeAndFlush(input).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("消息發送成功"); } });
迭代器模式要點回顧:
- 實現迭代器接口
- 實現對容器中的各個對象逐個訪問的方法。
復合ByteBuf:
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> { public byte getByte(int index) { return this._getByte(index); } }
責任鏈模式(可以說是Netty的大心臟了):
責任鏈:是指多個對象都有機會處理同一個請求,從而避免請求的發送者和接收者之間的耦合關系。然后,將這些對象連成一條鏈,並且沿着這條鏈往下傳遞請求,直到有一個對象可以處理它為止。在每個對象處理過程中,每個對象只處理它自己關心的那一部分,不相關的可以繼續往下傳遞,直到鏈中的某個對象不想處理,可以將請求終止或丟棄。責任鏈模式要點回顧:
- 需要有一個頂層責任處理接口(ChannelHandler)。
- 需要有動態創建鏈、添加和刪除責任處理器的接口(ChannelPipeline)。
- 需要有上下文機制(ChannelHandlerContext)。
- 需要有責任終止機制(不調用ctx.fireXXX()方法,則終止傳播)。
AbstractChannelHandlerContext:
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while(!ctx.inbound); return ctx; }
工廠模式要點回顧:
- 將創建對象的邏輯封裝起來。
ReflectiveChannelFactory:對於SocketChannel的初始化,正是利用了工廠模式進行反射初始化實例:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } else { this.clazz = clazz; } } public T newChannel() { try { return (Channel)this.clazz.newInstance(); } catch (Throwable var2) { throw new ChannelException("Unable to create Channel from class " + this.clazz, var2); } } public String toString() { return StringUtil.simpleClassName(this.clazz) + ".class"; } }
Netty 高性能並發調優
對於線程池的合理利用是提高程序性能的有效途徑之一,這里我通過線程池來測試Netty的性能,這里按照我們原來的代碼來啟動一個服務端:
public class Server { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { //自定義長度的解碼,每次發送一個long類型的長度數據 //一會每次傳遞一個系統的時間戳 ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
ch.pipeline().addLast(ServerHandler.INSTANCE);
} }); ChannelFuture channelFuture = bootstrap.bind(8080).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("bind success in port: " + port); } }); } }
這里唯一有變化的就是處理的ChannelHadler:
@ChannelHandler.Sharable public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final ChannelHandler INSTANCE = new ServerHandler();
//channelread0是主線程 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { ByteBuf data = Unpooled.directBuffer(); //從客戶端讀一個時間戳 data.writeBytes(msg); //模擬一次業務處理,有可能是數據庫操作,也有可能是邏輯處理 Object result = getResult(data); //重新寫會給客戶端 ctx.channel().writeAndFlush(result); } //模擬去數據庫拿到一個結果 protected Object getResult(ByteBuf data) { int level = ThreadLocalRandom.current().nextInt(1, 1000); //計算出每次響應需要的時間,用來做作為QPS的參考數據 //90.0% == 1ms 1000 100 > 1ms int time; if (level <= 900) { time = 1; //95.0% == 10ms 1000 50 > 10ms } else if (level <= 950) { time = 10; //99.0% == 100ms 1000 10 > 100ms } else if (level <= 990) { time = 100; //99.9% == 1000ms 1000 1 > 1000ms } else { time = 1000; } try { Thread.sleep(time); } catch (InterruptedException e) { } return data; } }
客戶端代碼:
public class Client { private static final String SERVER_HOST = "127.0.0.1"; public static void main(String[] args) throws Exception { new Client().start(8080); } public void start(int port) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_REUSEADDR, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES)); ch.pipeline().addLast(ClientHandler.INSTANCE); } }); //客戶端每秒鍾向服務端發起1000次請求 for (int i = 0; i < 1000; i++) { bootstrap.connect(SERVER_HOST, port).get(); } } }
·客戶端Handler:
@ChannelHandler.Sharable public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final ChannelHandler INSTANCE = new ClientHandler(); private static AtomicLong beginTime = new AtomicLong(0); //總響應時間 private static AtomicLong totalResponseTime = new AtomicLong(0); //總請求數 private static AtomicInteger totalRequest = new AtomicInteger(0); public static final Thread THREAD = new Thread(){ @Override public void run() { try { while (true) { long duration = System.currentTimeMillis() - beginTime.get(); if (duration != 0) { System.out.println("QPS: " + 1000 * totalRequest.get() / duration + ", " + "平均響應時間: " + ((float) totalResponseTime.get()) / totalRequest.get() + "ms."); Thread.sleep(2000); } } } catch (InterruptedException ignored) { } } }; @Override public void channelActive(final ChannelHandlerContext ctx) {
//上線,定時發送 ctx.executor().scheduleAtFixedRate(new Runnable() { public void run() { ByteBuf byteBuf = ctx.alloc().ioBuffer(); //將當前系統時間發送到服務端 byteBuf.writeLong(System.currentTimeMillis()); ctx.channel().writeAndFlush(byteBuf); } }, 0, 1, TimeUnit.SECONDS); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { //獲取一個響應時間差,本次請求的響應時間 totalResponseTime.addAndGet(System.currentTimeMillis() - msg.readLong()); //每次自增 totalRequest.incrementAndGet(); //第一次是0 會進入這里,同事設置開始時間為當前系統時間,啟動線程 if (beginTime.compareAndSet(0, System.currentTimeMillis())) { THREAD.start(); } } }
通過測試我們會發現服務的性能是越來越差,這樣下去那么最后會導致無法再提供服務了:
接下去我們通過線程池去解決這個問題,重新寫一個Handler來處理請求(線程池大小經過測試,在我的機器上100左右為最佳機器性能決定線程池大小性能):
@ChannelHandler.Sharable public class ServerThreadPoolHandler extends ServerHandler { public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler(); private static ExecutorService threadPool = Executors.newFixedThreadPool(100); @Override protected void channelRead0(final ChannelHandlerContext ctx, ByteBuf msg) { final ByteBuf data = Unpooled.directBuffer(); data.writeBytes(msg); threadPool.submit(new Runnable() { public void run() { Object result = getResult(data); ctx.channel().writeAndFlush(result); } }); } }
利用線程池處理再來看性能結果,可以看到性能有非常好的提升:
除了自己定義的Handler中進行線程池的處理之外,Netty本身就給我們提供了這么一個機制,這個主要是在ch.pipeline().addLast(ServerHandler.INSTANCE);的時候指定一個線程池大小:
final EventLoopGroup businessGroup = new NioEventLoopGroup(100); ch.pipeline().addLast(businessGroup, ServerHandler.INSTANCE);
在然我們來看看自帶的線程池是否也能達到我們要的性能,可以看到性能也是有很明顯地提高的: