本系列主要關注安卓數據庫的線程行為,分為四個部分:
(1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
(2)SQLiteDatabase的實現以及多線程行為
(3)連接緩存池SQLiteConnectionPool
(4)SQLiteDatabase多線程實踐
本篇主要關注SQLiteConnectionPool
(連接池)在並發下的行為。
上文提到,SQLiteDatabase
會在每個線程中使用一個SQLiteSession
,而SQLiteSession
會共用一個SQLiteConnectionPool
對象,並通過SQLiteConnectionPool
的acquireConnection
和releaseConnection
方法來獲取和釋放數據庫連接(一個SQLiteConnection
對象)。
public SQLiteConnection acquireConnection(String sql, int connectionFlags, CancellationSignal cancellationSignal) { return waitForConnection(sql, connectionFlags, cancellationSignal);//看名字是要等待什么鎖了 }
connectionFlags
是用SQLiteDatabase.getThreadDefaultConnectionFlags
的返回值一路傳下來的,這個方法在前文討論過這個方法,會記錄兩件事:1.數據庫是只讀還是可寫;2.當前是否主線程。
waitForConnection
方法比較長,我們一段一段地看。
1 嘗試立即獲取連接
//是否可寫連接。可寫的連接同一時間只能存在一個。 final boolean wantPrimaryConnection = (connectionFlags & CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY) != 0; final ConnectionWaiter waiter; final int nonce; synchronized (mLock) {//加鎖。留意這一段代碼中加鎖部分並未結束。 throwIfClosedLocked(); // Abort if canceled. if (cancellationSignal != null) { cancellationSignal.throwIfCanceled(); } // Try to acquire a connection. SQLiteConnection connection = null; if (!wantPrimaryConnection) { //嘗試獲取只讀連接 connection = tryAcquireNonPrimaryConnectionLocked( sql, connectionFlags); // might throw } if (connection == null) { //嘗試獲取可寫連接 connection = tryAcquirePrimaryConnectionLocked(connectionFlags); // might throw } if (connection != null) { return connection; }
到這里是嘗試直接獲取連接。嘗試的方法有tryAcquireNonPrimaryConnectionLocked
和tryAcquirePrimaryConnectionLocked
。只讀時只需要 non primary connection,而需要寫時要primary connection。
先看tryAcquirePrimaryConnectionLocked
:
// Might throw. private SQLiteConnection tryAcquirePrimaryConnectionLocked(int connectionFlags) { // If the primary connection is available, acquire it now. SQLiteConnection connection = mAvailablePrimaryConnection;//同時只能存在一個可寫連接,用一個成員變量mAvailablePrimaryConnection緩存空閑連接 if (connection != null) {//有緩存返回即可。finishAcquirePrimaryConnection會把connection放到mAcquiredConnections中。mAcquiredConnections存儲正在使用的連接。 mAvailablePrimaryConnection = null;//不再空閑 finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; } // Make sure that the primary connection actually exists and has just been acquired. //如果上一個if造成了不再空閑,則mAcquiredConnections中就會有一個primary connection,這里就會返回null。上一層的waitForConnection接到null會進入等待狀態,這個后面討論。 for (SQLiteConnection acquiredConnection : mAcquiredConnections.keySet()) { if (acquiredConnection.isPrimaryConnection()) { return null; } } //如果沒有在上面返回null,那么這一定是第一次請求primary connnection,或者有一個連接泄露了(未recycle的情況下finalize),這時候就需要用openConnectionLocked去新開一個連接。 // Uhoh. No primary connection! Either this is the first time we asked // for it, or maybe it leaked? connection = openConnectionLocked(mConfiguration, true /*primaryConnection*/); // might throw finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; }
然后看tryAcquireNonPrimaryConnectionLocked
:
// Might throw. private SQLiteConnection tryAcquireNonPrimaryConnectionLocked( String sql, int connectionFlags) { // Try to acquire the next connection in the queue. SQLiteConnection connection; //只讀連接可以有多個,用一個ArrayList緩存了所有空閑連接 final int availableCount = mAvailableNonPrimaryConnections.size(); if (availableCount > 1 && sql != null) { // If we have a choice, then prefer a connection that has the // prepared statement in its cache. // 如上面的英文注釋說的,如果有不止一個連接可選,那么挑選緩存了相同sql語句的那個。可能SQLiteConnection對此有優化? for (int i = 0; i < availableCount; i++) { connection = mAvailableNonPrimaryConnections.get(i); if (connection.isPreparedStatementInCache(sql)) {//如果有相同sql,返回 mAvailableNonPrimaryConnections.remove(i); finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; } } } if (availableCount > 0) { // Otherwise, just grab the next one. //沒有挑到,隨便給一個 connection = mAvailableNonPrimaryConnections.remove(availableCount - 1); finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; } // 一個空閑連接都沒有。 // Expand the pool if needed. int openConnections = mAcquiredConnections.size(); if (mAvailablePrimaryConnection != null) { openConnections += 1; } // 上面在計算有多少已打開連接(空閑+使用中)。這里肯定沒有空閑non primary連接了,而如果有空閑primary連接,則要 += 1。 if (openConnections >= mMaxConnectionPoolSize) { // 超過數據庫連接限制,放棄治療。連接限制與數據庫底層實現有關。 return null; } // 沒超限,還能再開一個連接。所以開連接並返回。 connection = openConnectionLocked(mConfiguration, false /*primaryConnection*/); // might throw finishAcquireConnectionLocked(connection, connectionFlags); // might throw return connection; }
在這一步中,進行了從緩存中取得連接的嘗試;而如果無法取得連接,也進行了打開連接的嘗試。如果再無法打開的話,就會拿到一個null了。后續就需要進行等待。
2 等待獲取連接
// 留意這里還在上一個鎖mLock中 // No connections available. Enqueue a waiter in priority order. final int priority = getPriority(connectionFlags);//主線程中的連接優先級更高,記得嗎? final long startTime = SystemClock.uptimeMillis(); // waiter是一個ConnectionWaiter對象。它同時也是一個鏈表,有一個同類的mNext成員變量。 // obtainConnectionWaiterLocked會去復用(取鏈表頭)或者新建一個對象。 waiter = obtainConnectionWaiterLocked(Thread.currentThread(), startTime, priority, wantPrimaryConnection, sql, connectionFlags); ConnectionWaiter predecessor = null; // 按照優先級向mConnectionWaiterQueue添加waitor對象。mConnectionWaiterQueue不是復用池,而是有效的等待隊列(也是鏈表)。 ConnectionWaiter successor = mConnectionWaiterQueue; while (successor != null) { if (priority > successor.mPriority) { waiter.mNext = successor; break; } predecessor = successor; successor = successor.mNext; } if (predecessor != null) { predecessor.mNext = waiter; } else { mConnectionWaiterQueue = waiter; } nonce = waiter.mNonce;//觀察recycleConnectionWaiterLocked方法,mNonce在waiter每次被復用完成回收時自增1 }//鎖mLock結束
到這里就是把需要等待的連接信息封裝到ConnectionWaiter
中,並將ConnectionWaiter
對象放到一個鏈表里。那么什么時候會結束等待並返回呢?繼續看代碼:
// Set up the cancellation listener. if (cancellationSignal != null) { cancellationSignal.setOnCancelListener(new CancellationSignal.OnCancelListener() { @Override public void onCancel() { synchronized (mLock) { if (waiter.mNonce == nonce) {//nonce的作用在這里體現。防止waiter對象復用造成誤取消。 cancelConnectionWaiterLocked(waiter); } } } }); }
這一段用於額外處理取消信號的。在等待連接過程中取消,就可以把這一個waiter去除了。
接下來:
try { // Park the thread until a connection is assigned or the pool is closed. // Rethrow an exception from the wait, if we got one. long busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS; long nextBusyTimeoutTime = waiter.mStartTime + busyTimeoutMillis; for (;;) {//循環開始 // Detect and recover from connection leaks. // 管理泄露連接的。如果一個SQLiteConnection在finalize時還未關閉,則會置泄露狀態。 // mConnectionLeaked是一個AtomicBoolean。 if (mConnectionLeaked.compareAndSet(true, false)) { synchronized (mLock) { wakeConnectionWaitersLocked();//有泄露連接被關閉的話,最大連接限制下就可能有位置空出來,這時候就可以嘗試分配一個連接 } } // Wait to be unparked (may already have happened), a timeout, or interruption. // 等待。那么unpark在哪里?在wakeConnectionWaitersLocked中。這個方法在上面泄露測試時調用過。 // 還有cancelConnectionWaiterLocked中,取消等待自然要喚醒線程處理一下。 LockSupport.parkNanos(this, busyTimeoutMillis * 1000000L); // Clear the interrupted flag, just in case. Thread.interrupted(); // Check whether we are done waiting yet. synchronized (mLock) { throwIfClosedLocked(); //等到了一個Connection。這個mAssignedConnection是何時賦值的呢? //也是在wakeConnectionWaitersLocked中賦值的。 final SQLiteConnection connection = waiter.mAssignedConnection; final RuntimeException ex = waiter.mException; if (connection != null || ex != null) { recycleConnectionWaiterLocked(waiter);//回收waiter,會造成mNonce自增1 if (connection != null) { return connection; } throw ex; // rethrow! } //沒拿到連接,繼續等。 final long now = SystemClock.uptimeMillis(); if (now < nextBusyTimeoutTime) { busyTimeoutMillis = now - nextBusyTimeoutTime; } else { logConnectionPoolBusyLocked(now - waiter.mStartTime, connectionFlags); busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS; nextBusyTimeoutTime = now + busyTimeoutMillis; } } }//循環結束 } finally { // Remove the cancellation listener. if (cancellationSignal != null) { cancellationSignal.setOnCancelListener(null); } } }
在這一步中,用ConnectionWaiter
來封裝等待中的連接信息,並按優先級放入一個鏈表,隨后進入等待狀態。獲取到連接后,等待狀態結束,返回連接。
3 連接的釋放
這里我們可以先預估以下:釋放連接時需要把被釋放的連接放回到空閑連接集合,並進行unpark操作,通知正在等待連接的線程。
代碼如下:
public void releaseConnection(SQLiteConnection connection) { synchronized (mLock) { AcquiredConnectionStatus status = mAcquiredConnections.remove(connection);//從活躍連接池中移除 if (status == null) { throw new IllegalStateException("Cannot perform this operation " + "because the specified connection was not acquired " + "from this pool or has already been released."); } if (!mIsOpen) { closeConnectionAndLogExceptionsLocked(connection); } else if (connection.isPrimaryConnection()) { if (recycleConnectionLocked(connection, status)) { assert mAvailablePrimaryConnection == null; mAvailablePrimaryConnection = connection;//放回可寫連接mAvailablePrimaryConnection } wakeConnectionWaitersLocked();//通知其它線程 } else if (mAvailableNonPrimaryConnections.size() >= mMaxConnectionPoolSize - 1) { closeConnectionAndLogExceptionsLocked(connection); } else { if (recycleConnectionLocked(connection, status)) { mAvailableNonPrimaryConnections.add(connection);//放回空閑只讀連接池 } wakeConnectionWaitersLocked();//通知其它線程 } } }
4 總結
綜上所述,SQLiteConnectionPool
提供數據庫連接的流程如下:
(1)從緩存中獲取一個空閑的連接。若有多個空閑連接,優先挑選執行過相同SQL的那個。注意如果是寫操作的話,則會返回一個primary connection,並將其它嘗試獲得primary connection的線程阻塞,直到當前線程結束使用連接。而只讀的操作則可以同時存在多個,並可以和寫操作的連接共存。
(2)如果緩存中沒有連接,檢查底層數據庫是否可以容納更多連接。如果可以,新建一個連接並返回。
(3)如果底層數據庫不再允許增加連接,則進入等待。到超時或者有其它連接被釋放結束等待。如果此時可以獲取連接,則返回連接。如果不能,進入新一輪等待。
5 多線程下的transaction
了解了以上的特性之后,transaction的多線程行為就比較好理解了。
以下是SQLiteDatabase
的beginTransaction
方法:
private void beginTransaction(SQLiteTransactionListener transactionListener, boolean exclusive) { acquireReference(); try { getThreadSession().beginTransaction( exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE : SQLiteSession.TRANSACTION_MODE_IMMEDIATE, transactionListener, getThreadDefaultConnectionFlags(false /*readOnly*/), null); } finally { releaseReference(); } }
留意flags參數傳入的readOnly為false,所以SQLiteSession
會從SQLiteConnectionPool
中獲取一個獨占的連接。並且在SQLiteSession
執行其它SQL語句的情況下,執行完成會將連接釋放回連接池,而beginTransaction
操作則不會,而是持有這一個連接直至同一線程內調用endTransaction
。這里再貼一遍SQLiteSession.execute
源碼:
public void execute(String sql, Object[] bindArgs, int connectionFlags, CancellationSignal cancellationSignal) { if (sql == null) { throw new IllegalArgumentException("sql must not be null."); } if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) { return; } acquireConnection(sql, connectionFlags, cancellationSignal); // might throw try { mConnection.execute(sql, bindArgs, cancellationSignal); // might throw } finally { //這里釋放了連接(其實是交還給連接池) releaseConnection(); // might throw } }
所以,當有一個線程在transaction過程中時,其它線程的寫操作和beginTransaction
操作都會被阻塞住,直至當前線程的transaction完成才會按照優先級挑選一個線程繼續。