Netty實戰之性能調優與設計模式


設計模式在Netty 中的應用(回顧):

單例模式要點回顧:

  1. 一個類在任何情況下只有一個對象,並提供一個全局訪問點。
  2. 可延遲創建。
  3. 避免線程安全問題。

  在我們利用netty自帶的容器來管理客戶端鏈接的NIOSocketChannel的時候我們會利用public static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);來管理,這里就有單例的應用,而對於單例的線程安全模式最簡單的就是餓漢式。如下,當然在Netty中有很多地方都會應用到單例,這里只是舉類說明:

public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
    ......
    public static final GlobalEventExecutor INSTANCE;
static {
        SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1L);
        INSTANCE = new GlobalEventExecutor();
    }
    ......
}

策略模式要點回顧:

  1. 封裝一系列可相互替換的算法家族。
  2. 動態選擇某一個策略。

  在我們的NioEventLoopGroup初始化的時候,在其中創建了一個指定大小的EventExecutor數組,而選擇這個執行的過程正式利用了策略模式,而這個策略根據該數組大小是否是二次冪來決定:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() {
    }

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) : 
          new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors)); } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } public EventExecutor next() { return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)]; } } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } public EventExecutor next() { return this.executors[this.idx.getAndIncrement() & this.executors.length - 1]; } } }

裝飾者模式要點回顧:

  1. 裝飾者和被裝飾者實現同一個接口。
  2. 裝飾者通常繼承被裝飾者,同宗同源。
  3. 動態修改、重載被裝飾者的方法。

  這是在一個不可釋放的Buf中的例子:

class WrappedByteBuf extends ByteBuf {
    protected final ByteBuf buf;

    protected WrappedByteBuf(ByteBuf buf) {
        if (buf == null) {
            throw new NullPointerException("buf");
        } else {
            this.buf = buf;
        }
    }
    ......
}
final class UnreleasableByteBuf extends WrappedByteBuf {
private SwappedByteBuf swappedBuf;

UnreleasableByteBuf(ByteBuf buf) {
super(buf);
}
  ......
  public boolean release() {
    return false;
  }
  public boolean release(int decrement) {
    return false;
  }
}

觀察者模式要點回顧:

  1. 兩個角色:觀察者和被觀察者。
  2. 觀察者訂閱消息,被觀察者發布消息。
  3. 訂閱則能收到消息,取消訂閱則收不到。

  這個例子是channel.writeAndFlush()方法:我們可以通過添加觀察者來監聽消息發送的結果,結果會被保存到ChannelFuture中:

future.channel().writeAndFlush(input).addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture channelFuture) throws Exception {
        System.out.println("消息發送成功");
   }
});

迭代器模式要點回顧:

  1. 實現迭代器接口
  2. 實現對容器中的各個對象逐個訪問的方法。

  復合ByteBuf:

public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
   public byte getByte(int index) {
        return this._getByte(index);
     }
}

責任鏈模式(可以說是Netty的大心臟了):

  責任鏈:是指多個對象都有機會處理同一個請求,從而避免請求的發送者和接收者之間的耦合關系。然后,將這些對象連成一條鏈,並且沿着這條鏈往下傳遞請求,直到有一個對象可以處理它為止。在每個對象處理過程中,每個對象只處理它自己關心的那一部分,不相關的可以繼續往下傳遞,直到鏈中的某個對象不想處理,可以將請求終止或丟棄。責任鏈模式要點回顧:

  1. 需要有一個頂層責任處理接口(ChannelHandler)。
  2. 需要有動態創建鏈、添加和刪除責任處理器的接口(ChannelPipeline)。
  3. 需要有上下文機制(ChannelHandlerContext)。
  4. 需要有責任終止機制(不調用ctx.fireXXX()方法,則終止傳播)。

  AbstractChannelHandlerContext:

private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;

        do {
            ctx = ctx.next;
        } while(!ctx.inbound);

        return ctx;
    }

工廠模式要點回顧:

  1. 將創建對象的邏輯封裝起來。

  ReflectiveChannelFactory:對於SocketChannel的初始化,正是利用了工廠模式進行反射初始化實例:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        } else {
            this.clazz = clazz;
        }
    }

    public T newChannel() {
        try {
            return (Channel)this.clazz.newInstance();
        } catch (Throwable var2) {
            throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
        }
    }

    public String toString() {
        return StringUtil.simpleClassName(this.clazz) + ".class";
    }
}

Netty 高性能並發調優

  對於線程池的合理利用是提高程序性能的有效途徑之一,這里我通過線程池來測試Netty的性能,這里按照我們原來的代碼來啟動一個服務端:

public class Server {
public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                //自定義長度的解碼,每次發送一個long類型的長度數據
                //一會每次傳遞一個系統的時間戳
                ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
         
ch.pipeline().addLast(ServerHandler.INSTANCE);
        }
        });

        ChannelFuture channelFuture = bootstrap.bind(8080).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("bind success in port: " + port);
            }
        });
    }
}

  這里唯一有變化的就是處理的ChannelHadler:

@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final ChannelHandler INSTANCE = new ServerHandler();
//channelread0是主線程 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { ByteBuf data = Unpooled.directBuffer(); //從客戶端讀一個時間戳 data.writeBytes(msg); //模擬一次業務處理,有可能是數據庫操作,也有可能是邏輯處理 Object result = getResult(data); //重新寫會給客戶端 ctx.channel().writeAndFlush(result); } //模擬去數據庫拿到一個結果 protected Object getResult(ByteBuf data) { int level = ThreadLocalRandom.current().nextInt(1, 1000); //計算出每次響應需要的時間,用來做作為QPS的參考數據 //90.0% == 1ms 1000 100 > 1ms int time; if (level <= 900) { time = 1; //95.0% == 10ms 1000 50 > 10ms } else if (level <= 950) { time = 10; //99.0% == 100ms 1000 10 > 100ms } else if (level <= 990) { time = 100; //99.9% == 1000ms 1000 1 > 1000ms } else { time = 1000; } try { Thread.sleep(time); } catch (InterruptedException e) { } return data; } }

  客戶端代碼:

public class Client {

    private static final String SERVER_HOST = "127.0.0.1";

    public static void main(String[] args) throws Exception {
        new Client().start(8080);
    }
    public void start(int port) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
                        ch.pipeline().addLast(ClientHandler.INSTANCE);
                    }
        });

        //客戶端每秒鍾向服務端發起1000次請求
        for (int i = 0; i < 1000; i++) {
            bootstrap.connect(SERVER_HOST, port).get();
        }
    }
}

·客戶端Handler:

@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final ChannelHandler INSTANCE = new ClientHandler();

    private static AtomicLong beginTime = new AtomicLong(0);
    //總響應時間
    private static AtomicLong totalResponseTime = new AtomicLong(0);
    //總請求數
    private static AtomicInteger totalRequest = new AtomicInteger(0);

    public static final Thread THREAD = new Thread(){
        @Override
        public void run() {
            try {
                while (true) {
                    long duration = System.currentTimeMillis() - beginTime.get();
                    if (duration != 0) {
                        System.out.println("QPS: " + 1000 * totalRequest.get() / duration + ", " + "平均響應時間: " + ((float) totalResponseTime.get()) / totalRequest.get() + "ms.");
                        Thread.sleep(2000);
                    }
                }
            } catch (InterruptedException ignored) {
            }
        }
    };

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
     //上線,定時發送 ctx.executor().scheduleAtFixedRate(
new Runnable() { public void run() { ByteBuf byteBuf = ctx.alloc().ioBuffer(); //將當前系統時間發送到服務端 byteBuf.writeLong(System.currentTimeMillis()); ctx.channel().writeAndFlush(byteBuf); } }, 0, 1, TimeUnit.SECONDS); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { //獲取一個響應時間差,本次請求的響應時間 totalResponseTime.addAndGet(System.currentTimeMillis() - msg.readLong()); //每次自增 totalRequest.incrementAndGet(); //第一次是0 會進入這里,同事設置開始時間為當前系統時間,啟動線程 if (beginTime.compareAndSet(0, System.currentTimeMillis())) { THREAD.start(); } } }

  通過測試我們會發現服務的性能是越來越差,這樣下去那么最后會導致無法再提供服務了:

  接下去我們通過線程池去解決這個問題,重新寫一個Handler來處理請求(線程池大小經過測試,在我的機器上100左右為最佳機器性能決定線程池大小性能):

@ChannelHandler.Sharable
public class ServerThreadPoolHandler extends ServerHandler {
    public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(100);
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, ByteBuf msg) {
        final ByteBuf data = Unpooled.directBuffer();
        data.writeBytes(msg);
        threadPool.submit(new Runnable() {
            public void run() {
                Object result = getResult(data);
                ctx.channel().writeAndFlush(result);
            }
        });
    }
}

  利用線程池處理再來看性能結果,可以看到性能有非常好的提升:

  除了自己定義的Handler中進行線程池的處理之外,Netty本身就給我們提供了這么一個機制,這個主要是在ch.pipeline().addLast(ServerHandler.INSTANCE);的時候指定一個線程池大小:

final EventLoopGroup businessGroup = new NioEventLoopGroup(100);
ch.pipeline().addLast(businessGroup, ServerHandler.INSTANCE);

  在然我們來看看自帶的線程池是否也能達到我們要的性能,可以看到性能也是有很明顯地提高的:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM