Druid數據庫連接池源碼分析


  上一篇文章重點介紹了一下Java的Future模式,最后意淫了一個數據庫連接池的場景。本想通過Future模式來防止,當多個線程同時獲取數據庫連接時各自都生成一個,造成資源浪費。但是忽略了一個根本的功能,就是多個線程同時調用get方法時,得到的是同一個數據庫連接的多個引用,這會導致嚴重的問題。

  所以,我抽空看了看呼聲很高的Druid的數據庫連接池實現,當然關注點主要是多線程方面的處理。我覺得,帶着問題去看源碼是一種很好的思考方式。

  Druid不僅僅是一個數據庫連接池,還有很多標簽,比如統計監控、過濾器、SQL解析等。既然要分析連接池,那先看看DruidDataSource類

getConnection方法的實現:

 

    @Override
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }

    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

 

返回的是一個DruidPooledConnection,這個類后面再說;另外這里傳入了一個long類型maxWait,應該是用來做超時處理的;init方法在getConnection方法里面調用,這也是一種很好的設計;里面的過濾器鏈的處理就不多說了。

    public void init() throws SQLException {
        if (inited) {
            return;
        }

        final ReentrantLock lock = this.lock;  // 使用lock而不是synchronized
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
            if (inited) {
                return;
            }

            init = true;

            connections = new DruidConnectionHolder[maxActive];  // 數組

            try {
                // init connections
                for (int i = 0, size = getInitialSize(); i < size; ++i) {
                    Connection conn = createPhysicalConnection();  // 生成真正的數據庫連接
                    DruidConnectionHolder holder = new DruidConnectionHolder(this, conn);
                    connections[poolingCount] = holder;
                    incrementPoolingCount();
                }

                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            } catch (SQLException ex) {
                LOG.error("init datasource error, url: " + this.getUrl(), ex);
                connectError = ex;
            }

            createAndLogThread();
            createAndStartCreatorThread();
            createAndStartDestroyThread();

            initedLatch.await();

            initedTime = new Date();
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }
        } catch (SQLException e) {
            LOG.error("dataSource init error", e);
            throw e;
        } catch (InterruptedException e) {
            throw new SQLException(e.getMessage(), e);
        } finally {
            inited = true;
            lock.unlock();  // 釋放鎖

            if (init && LOG.isInfoEnabled()) {
                LOG.info("{dataSource-" + this.getID() + "} inited");
            }
        }
    }    

  我這里做了刪減,加了一些簡單的注釋。通過這個方法,正好復習一下之前寫的那些知識點,如果感興趣,可以看看我之前寫的文章。

  這里使用了lock,並且保證只會被執行一次。根據初始容量,先生成了一批數據庫連接,用一個數組connections存放這些連接的引用,而且專門定義了一個變量poolingCount來保存這些連接的總數量。

  看到initedLatch.await有一種似曾相識的感覺

 

    private final CountDownLatch             initedLatch             = new CountDownLatch(2);

 

  這里調用了await方法,那countDown方法在哪些線程里面被調用呢

    protected void createAndStartCreatorThread() {
        if (createScheduler == null) {
            String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
            createConnectionThread = new CreateConnectionThread(threadName);
            createConnectionThread.start();
            return;
        }

        initedLatch.countDown();
    }

  這里先判斷createScheduler這個調度線程池是否被設置,如果沒有設置,直接countDown;否則,就開啟一個創建數據庫連接的線程,當然這個線程的run方法還是會調用countDown方法。但是這里我有一個疑問:開啟創建連接的線程,為什么一定要有一個調度線程池呢???

  難道是當數據庫連接創建失敗的時候,需要過了指定時間后,再重試?這么理解好像有點牽強,希望高人來評論。

  還有就是,當開啟destroy線程的時候也會調用countDown方法。

 

  接着在看getConnection方法,一直調用到getConnectionInternal方法

        DruidConnectionHolder holder;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            connectErrorCount.incrementAndGet();
            throw new SQLException("interrupt", e);
        }

        try {
            if (maxWait > 0) {
                holder = pollLast(nanos);
            } else {
                holder = takeLast();
            }

        } catch (InterruptedException e) {
            connectErrorCount.incrementAndGet();
            throw new SQLException(e.getMessage(), e);
        } catch (SQLException e) {
            connectErrorCount.incrementAndGet();
            throw e;
        } finally {
            lock.unlock();
        }

        holder.incrementUseCount();

        DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
        return poolalbeConnection;    

  我這里還是做了刪減。大體邏輯是:先從連接池中取出DruidConnectionHolder,然后再封裝成DruidPooledConnection對象返回。再看看取holder的方法:

    DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
        try {
            while (poolingCount == 0) {
                emptySignal(); // send signal to CreateThread create connection
                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;

                if (!enable) {
                    connectErrorCount.incrementAndGet();
                    throw new DataSourceDisableException();
                }
            }
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }

        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }

  這個方法非常好的詮釋了Lock-Condition的使用場景,幾行綠色的注釋解釋的很明白了,如果對empty和notEmpty看不太懂,可以去看看我之前寫的那篇文章。

  這個方法的邏輯:先判斷池中的連接數,如果到0了,那么本線程就得被掛起,同時釋放empty信號,並且等待notEmpty的信號。如果還有連接,就取出數組的最后一個,同時更改poolingCount。

 

  到這里,基本理解了Druid數據庫連接池獲取連接的實現流程。但是,如果不去看看里面的數據結構,還是會一頭霧水。我們就看看幾個基本的類,以及它們之間的持有關系。

  1、DruidDataSource持有一個DruidConnectionHolder的數組,保存所有的數據庫連接

private volatile DruidConnectionHolder[] connections;  // 注意這里的volatile

  2、DruidConnectionHolder持有數據庫連接,還有所在的DataSource等

    private final DruidAbstractDataSource       dataSource;
    private final Connection                    conn;

  3、DruidPooledConnection持有DruidConnectionHolder,所在線程等

    protected volatile DruidConnectionHolder holder;

    private final Thread                     ownerThread;

  對於這種設計,我很好奇為什么要添加一層holder做封裝,數組里直接存放Connection好像也未嘗不可。

  其實,這么設計是有道理的。比如說,一個Connection對象可以產生多個Statement對象,當我們想同時保存Connection和對應的多個Statement的時候,就比較糾結。

  再看看DruidConnectionHolder的成員變量

    private PreparedStatementPool               statementPool;

    private final List<Statement>               statementTrace           = new ArrayList<Statement>(2);

這樣的話,既可以做緩存,也可以做統計。

 

  最終我們對Connection的操作都是通過DruidPooledConnection來實現,比如commit、rollback等,它們大都是通過實際的數據庫連接完成工作。而我比較關心的是close方法的實現,close方法最核心的邏輯是recycle方法:

    public void recycle() throws SQLException {
        if (this.disable) {
            return;
        }

        DruidConnectionHolder holder = this.holder;
        if (holder == null) {
            if (dupCloseLogEnable) {
                LOG.error("dup close");
            }
            return;
        }

        if (!this.abandoned) {
            DruidAbstractDataSource dataSource = holder.getDataSource();
            dataSource.recycle(this);
        }

        this.holder = null;
        conn = null;
        transactionInfo = null;
        closed = true;
    }

  通過最后幾行代碼,能夠看出,並沒有調用實際數據庫連接的close方法,而只是斷開了之前那張圖里面的4號引用。用這種方式,來實現數據庫連接的復用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM