Netty client 多路復用 連接池


有兩種方式:

1)使用netty自帶的

Netty自帶連接池的使用

/Users/joyce/work/jds/trade/trade-shenjinrong/jincePfyhServer com.jincetrade.pfyhserver.client.PfyhClientPool

2)自己for循環

教你正確地利用Netty建立連接池

 

演變:

a)

for (

bootstrap.group(eventLoopGroup).connect

)

問題很明顯,如果每一個channel都對應一個NIOEventLoopGroup,那么我們實際上構建了一個connection:thread = 1:1的模型,隨着連接數不斷地擴大,線程膨脹的問題就會突顯出來。

 而nio本身是一個線程處理所有連接

b)

bootstrap.group(eventLoopGroup)

for(

bootstrap.connect

)

 

本文在2.b方式上結合apache common pool 2 實現

package com.jds.test.proto;

/**
 * Created by sunyuming on 19/8/13.
 */

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 *
 * EWrapper連接池工廠
 * @author shane
 *
 */
public class ClientPool extends BasePooledObjectFactory<Channel>{

    private String ip;
    private int port;

    public ClientPool() {
        ip = "127.0.0.1";
        port = 8866;
    }

    public static void main(String[] args) throws Exception {
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxWaitMillis(2000);
        config.setMinIdle(3);
        config.setMaxIdle(5);
        config.setMaxTotal(5);

        /**
         * 銷毀到minidle
         */
        config.setSoftMinEvictableIdleTimeMillis(10000);
        config.setTestOnBorrow(true);

        /**
         * 銷毀所有
         */
        //   config.setMinEvictableIdleTimeMillis(3000);

        // 詞句是必須的,否則(Soft)MinEvictableIdleTimeMillis不生效
        config.setTimeBetweenEvictionRunsMillis(100);

        GenericObjectPool<Channel> objectPool = new GenericObjectPool<Channel>(new ClientPool(), config);

        // 先睡5s讓系統補足3個對象
        Thread.sleep(5000);
        List<Channel> list = new ArrayList<>();
        for(int i=0; i<4; ++i) {
            list.add(objectPool.borrowObject());
        }

        for(Channel channel : list) {
            objectPool.returnObject(channel);
        }

        /**
         * 休息8秒等待服務端斷開連接
         */
        Thread.sleep(8000);

        /**
         * 觸發activate,validate后失敗,觸發create
         */
        objectPool.borrowObject();

    }

    @Override
    public Channel create() throws Exception {
        ChannelFuture future = getBoot().connect(new InetSocketAddress("127.0.0.1",8866)).sync();
        return future.channel();
    }

    @Override
    public PooledObject<Channel> wrap(Channel channel) {
        System.out.println("創建" + new Date());
        return new DefaultPooledObject<Channel>(channel);
    }

    /**
     * 對象銷毀
     * @param pooledObject
     */
    @Override
    public void destroyObject(PooledObject<Channel> pooledObject) throws Exception {
        System.out.println("銷毀" + new Date());
        Channel channel = pooledObject.getObject();
        channel.close();
        super.destroyObject(pooledObject);
    }

    /**
     * 驗證對象有效性
     * @param
     * @return
     */
    @Override
    public boolean validateObject(PooledObject<Channel> pooledObject) {
        System.out.println("validate " + pooledObject.getObject().isActive() + new Date());
        return pooledObject.getObject().isActive();
    }

    private static Bootstrap bootstrap;
    public static Bootstrap getBoot() {
        if(bootstrap == null) {
            synchronized (ClientPool.class) {
                if (bootstrap == null) {
                    EventLoopGroup worker = new NioEventLoopGroup();

                    //輔助啟動類
                    bootstrap = new Bootstrap();

                    //設置線程池
                    bootstrap.group(worker);

                    //設置socket工廠
                    bootstrap.channel(NioSocketChannel.class);

                    //設置管道
                    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //獲取管道
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4));
                            pipeline.addLast(new ProtobufDecoder(MyBaseProto.BaseProto.getDefaultInstance()));
                            pipeline.addLast(new LengthFieldPrepender(4));
                            pipeline.addLast(new ProtobufEncoder());

                            pipeline.addLast(new IdleStateHandler(61, 30, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new ClientHeartbeatHandler());

                            //處理類
                            pipeline.addLast(new ClientHandler4Heart());
                        }
                    });

                    return bootstrap;
                } else {
                    return bootstrap;
                }
            }

        } else {
            return bootstrap;
        }
    }



}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM