Lettuce連接池


       Lettuce 連接被設計為線程安全,所以一個連接可以被多個線程共享,同時lettuce連接默認是自動重連.雖然連接池在大多數情況下是不必要的,但在某些用例中可能是有用的.lettuce提供通用的連接池支持. 如有疏漏后續會更新 https://www.cnblogs.com/wei-zw/p/9163687.html

連接池是否有必要?


     Lettuce被線程安全的,它滿足了多數場景需求. 所有Redis用戶的操作是單線程執行的.使用多連接並不能改善一個應用的性能. 阻塞操作的使用通常與獲得專用連接的工作線程結合在一起.
        使用Redis事務是使用動態連接池的典型場景,因為需要專用連接的線程數趨於動態.也就是說,動態連接池的需求是有限的.連接池總是伴隨着復雜性和維護成本提升.

同步連接池

  使用命令式編程,同步連接池是正確的選擇,因為它在用於執行執行Redis命令的線程上執行所有操作.

   前提條件
       Lettuce需要依賴 Apache的 common-pool2(至少是2.2)提供連接池. 確認在你的classpath下包含這個依賴.否則你就不能使用連接池.
如果使用Maven,向你的pom.xml添加如下依賴

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.3</version>
</dependency>

 連接池支持

Lettuce提供通用連接池支持,它需要一個用於創建任何支持類型連接(單個,發布訂閱,哨兵,主從,集群)的提供者. ConnectionPoolSupport 將根據你的需求創建一個 GenericObjectPool或SoftReferenceObjectPool. 連接池可以分配包裝類型或直接連接

  • 包裝實例在調用StatefulConnection.close()時,會將連接歸還到連接池
  • 直接連接需要調用GenericObjectPool.returnObject(...)歸還到連接池

基本用法:

  包裝連接

  GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxIdle(2);

        GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(
                () -> client.connect(), poolConfig);
        for (int i = 0; i < 10; i++) {
            StatefulRedisConnection<String, String> connection = pool.borrowObject();
            RedisCommands<String, String> sync = connection.sync();
            sync.ping();
            connection.close();
        }

 

直接連接

     GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(
                () -> client.connect(), new GenericObjectPoolConfig(), false);

        for (int i = 0; i < 10; i++) {
            StatefulRedisConnection<String, String> connection = pool.borrowObject();
            RedisCommands<String, String> sync = connection.sync();
            sync.ping();
       //主動將連接歸還到連接池  pool.returnObject(connection); }

  

相關源碼分析

 public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();

        GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {

            @Override
            public T borrowObject() throws Exception {
                //如果wrapConnection 設置為true,則對連接創建動態代理
                return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
            }

            @Override
            public void returnObject(T obj) {

                if (wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
                    return;
                }
                super.returnObject(obj);
            }
        };

        poolRef.set(pool);

        return pool;
    }

  創建一個包裝類型到連接

 private static <T> T wrapConnection(T connection, ObjectPool<T> pool) {
        
        //創建調用處理器
        ReturnObjectOnCloseInvocationHandler<T> handler = new ReturnObjectOnCloseInvocationHandler<T>(connection, pool);

        Class<?>[] implementedInterfaces = connection.getClass().getInterfaces();
        Class[] interfaces = new Class[implementedInterfaces.length + 1];
        interfaces[0] = HasTargetConnection.class;
        System.arraycopy(implementedInterfaces, 0, interfaces, 1, implementedInterfaces.length);
        //創建代理連接
        T proxiedConnection = (T) Proxy.newProxyInstance(connection.getClass().getClassLoader(), interfaces, handler);
        //向連接調用處理器設置代理連接
        handler.setProxiedConnection(proxiedConnection);
        //返回代理連接
        return proxiedConnection;
    }

  包裝類型連接的動態調用處理器

  private static class ReturnObjectOnCloseInvocationHandler<T> extends AbstractInvocationHandler {
        //被代理對連接
        private T connection;
        private T proxiedConnection;
        private Map<Method, Object> connectionProxies = new ConcurrentHashMap<>(5, 1);
        //連接池
        private final ObjectPool<T> pool;

        ReturnObjectOnCloseInvocationHandler(T connection, ObjectPool<T> pool) {
            this.connection = connection;
            this.pool = pool;
        }
        
        //設置代理連接
        void setProxiedConnection(T proxiedConnection) {
            this.proxiedConnection = proxiedConnection;
        }

        @Override
        protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
             //如果調用方法是  getStatefulConnection則返回代理連接
            if (method.getName().equals("getStatefulConnection")) {
                return proxiedConnection;
            }
            //如果調用的方法是getTargetConnection 則返回真實連接
            if (method.getName().equals("getTargetConnection")) {
                return connection;
            }
            //如果真實連接為null則拋出異常
            if (connection == null) {
                throw new RedisException("Connection is deallocated and cannot be used anymore.");
            }
            //如果調用的方法是close則將代理連接歸還到連接池,並將真實連接設置和代理連接設置為null
            if (method.getName().equals("close")) {
                pool.returnObject(proxiedConnection);
                connection = null;
                proxiedConnection = null;
                connectionProxies.clear();
                return null;
            }

            try {
                //如果調用方法是獲取連接則從代理連接池中獲取,如果沒有則創建代理連接並放入緩存
                if (method.getName().equals("sync") || method.getName().equals("async") || method.getName().equals("reactive")) {
                    return connectionProxies.computeIfAbsent(
                            method, m -> getInnerProxy(method, args));
                }
                //其它方法不在多任何攔截
                return method.invoke(connection, args);

            } catch (InvocationTargetException e) {
                throw e.getTargetException();
            }
        }

        @SuppressWarnings("unchecked")
        private Object getInnerProxy(Method method, Object[] args) {

            try {
                Object result = method.invoke(connection, args);

                result = Proxy.newProxyInstance(getClass().getClassLoader(), result.getClass().getInterfaces(),
                        new DelegateCloseToConnectionInvocationHandler<>((AutoCloseable) proxiedConnection, result));

                return result;
            } catch (IllegalAccessException e) {
                throw new RedisException(e);
            } catch (InvocationTargetException e) {
                throw new RedisException(e.getTargetException());

            }
        }

        public T getConnection() {
            return connection;
        }
    }

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM