有兩種方式:
1)使用netty自帶的
Netty自帶連接池的使用
/Users/joyce/work/jds/trade/trade-shenjinrong/jincePfyhServer com.jincetrade.pfyhserver.client.PfyhClientPool
2)自己for循環
教你正確地利用Netty建立連接池
演變:
a)
for (
bootstrap.group(eventLoopGroup).connect
)
問題很明顯,如果每一個channel都對應一個NIOEventLoopGroup,那么我們實際上構建了一個connection:thread = 1:1的模型,隨着連接數不斷地擴大,線程膨脹的問題就會突顯出來。
而nio本身是一個線程處理所有連接
b)
bootstrap.group(eventLoopGroup)
for(
bootstrap.connect
)
本文在2.b方式上結合apache common pool 2 實現
package com.jds.test.proto; /** * Created by sunyuming on 19/8/13. */ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.List; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * * EWrapper連接池工廠 * @author shane * */ public class ClientPool extends BasePooledObjectFactory<Channel>{ private String ip; private int port; public ClientPool() { ip = "127.0.0.1"; port = 8866; } public static void main(String[] args) throws Exception { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxWaitMillis(2000); config.setMinIdle(3); config.setMaxIdle(5); config.setMaxTotal(5); /** * 銷毀到minidle */ config.setSoftMinEvictableIdleTimeMillis(10000); config.setTestOnBorrow(true); /** * 銷毀所有 */ // config.setMinEvictableIdleTimeMillis(3000); // 詞句是必須的,否則(Soft)MinEvictableIdleTimeMillis不生效 config.setTimeBetweenEvictionRunsMillis(100); GenericObjectPool<Channel> objectPool = new GenericObjectPool<Channel>(new ClientPool(), config); // 先睡5s讓系統補足3個對象 Thread.sleep(5000); List<Channel> list = new ArrayList<>(); for(int i=0; i<4; ++i) { list.add(objectPool.borrowObject()); } for(Channel channel : list) { objectPool.returnObject(channel); } /** * 休息8秒等待服務端斷開連接 */ Thread.sleep(8000); /** * 觸發activate,validate后失敗,觸發create */ objectPool.borrowObject(); } @Override public Channel create() throws Exception { ChannelFuture future = getBoot().connect(new InetSocketAddress("127.0.0.1",8866)).sync(); return future.channel(); } @Override public PooledObject<Channel> wrap(Channel channel) { System.out.println("創建" + new Date()); return new DefaultPooledObject<Channel>(channel); } /** * 對象銷毀 * @param pooledObject */ @Override public void destroyObject(PooledObject<Channel> pooledObject) throws Exception { System.out.println("銷毀" + new Date()); Channel channel = pooledObject.getObject(); channel.close(); super.destroyObject(pooledObject); } /** * 驗證對象有效性 * @param * @return */ @Override public boolean validateObject(PooledObject<Channel> pooledObject) { System.out.println("validate " + pooledObject.getObject().isActive() + new Date()); return pooledObject.getObject().isActive(); } private static Bootstrap bootstrap; public static Bootstrap getBoot() { if(bootstrap == null) { synchronized (ClientPool.class) { if (bootstrap == null) { EventLoopGroup worker = new NioEventLoopGroup(); //輔助啟動類 bootstrap = new Bootstrap(); //設置線程池 bootstrap.group(worker); //設置socket工廠 bootstrap.channel(NioSocketChannel.class); //設置管道 bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //獲取管道 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4)); pipeline.addLast(new ProtobufDecoder(MyBaseProto.BaseProto.getDefaultInstance())); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new IdleStateHandler(61, 30, 0, TimeUnit.SECONDS)); pipeline.addLast(new ClientHeartbeatHandler()); //處理類 pipeline.addLast(new ClientHandler4Heart()); } }); return bootstrap; } else { return bootstrap; } } } else { return bootstrap; } } }