SQLite剖析之事務處理技術


前言

  事務處理是DBMS中最關鍵的技術,對SQLite也一樣,它涉及到並發控制,以及故障恢復等等。在數據庫中使用事務可以保證數據的統一和完整性,同時也可以提高效率。假設需要在一張表內一次插入20個人的名字才算是操作成功,那么在不使用事務的情況下,如果插入過程中出現異常或者在插入過程中出現一些其他數據庫操作的話,就很有可能影響了操作的完整性。所以事務可以很好地解決這樣的情況,首先事務是可以把啟動事務過程中的所有操作視為事務的過程。等到所有過程執行完畢后,我們可以根據操作是否成功來決定事務是否進行提交或者回滾。提交事務后會一次性把所有數據提交到數據庫,如果回滾了事務就會放棄這次的操作,而對原來表的數據不進行更改。

  SQLite中分別以BEGIN、COMMIT和ROLLBACK啟動、提交和回滾事務。見如下示例:

@try{    
  char *errorMsg;   
  if (sqlite3_exec(_database, "BEGIN", NULL, NULL, &errorMsg)==SQLITE_OK) {    
    NSLog(@”啟動事務成功”);    
    sqlite3_free(errorMsg);    
    sqlite3_stmt *statement;   
    if (sqlite3_prepare_v2(_database, [@"insert into persons(name) values(?);" UTF8String], -1, &statement, NULL)==SQLITE_OK) {    
      //綁定參數    
      const char *text=[@”張三” cStringUsingEncoding:NSUTF8StringEncoding];    
      sqlite3_bind_text(statement, index, text, strlen(text), SQLITE_STATIC);      
      if (sqlite3_step(statement)!=SQLITE_DONE) {    
        sqlite3_finalize(statement);    
      }    
    }      
    if (sqlite3_exec(_database, "COMMIT", NULL, NULL, &errorMsg)==SQLITE_OK) {    
      NSLog(@”提交事務成功”);    
    }    
    sqlite3_free(errorMsg);    
  }
  else{     sqlite3_free(errorMsg);   } } @catch(NSException *e){   char *errorMsg;   if (sqlite3_exec(_database, "ROLLBACK", NULL, NULL, &errorMsg)==SQLITE_OK) {     NSLog(@”回滾事務成功”);   }   sqlite3_free(errorMsg); } @finally{ }

  在SQLite中,如果沒有為當前的SQL命令(SELECT除外)顯示的指定事務,那么SQLite會自動為該操作添加一個隱式的事務,以保證該操作的原子性和一致性。當然,SQLite也支持顯示的事務,其語法與大多數關系型數據庫相比基本相同。見如下示例:

sqlite> BEGIN TRANSACTION;
sqlite> INSERT INTO testtable VALUES(1);
sqlite> INSERT INTO testtable VALUES(2); 
sqlite> COMMIT TRANSACTION;      --顯示事務被提交,數據表中的數據也發生了變化。
sqlite> SELECT COUNT(*) FROM testtable;
COUNT(*)
----------
2
sqlite> BEGIN TRANSACTION;
sqlite> INSERT INTO testtable VALUES(1);
sqlite> ROLLBACK TRANSACTION;   --顯示事務被回滾,數據表中的數據沒有發生變化。
sqlite> SELECT COUNT(*) FROM testtable;
COUNT(*)
----------
2

Page Cache之事務處理——SQLite原子提交的實現

  下面通過具體示例來分析SQLite原子提交的實現(基於Version 3.3.6的代碼):

CREATE TABLE episodes( id integer primary key,name text, cid int);
insert into episodes(name,cid) values("cat",1);    --插入一條記錄

  它經過編譯器處理后生成的虛擬機代碼如下:

sqlite> explain insert into episodes(name,cid) values("cat",1);
0|Trace|0|0|0|explain insert into episodes(name,cid) values("cat",1);|00|
1|Goto|0|12|0||00|
2|SetNumColumns|0|3|0||00|
3|OpenWrite|0|2|0||00|
4|NewRowid|0|2|0||00|
5|Null|0|3|0||00|
6|String8|0|4|0|cat|00|
7|Integer|1|5|0||00|
8|MakeRecord|3|3|6|dad|00|
9|Insert|0|6|2|episodes|0b|
10|Close|0|0|0||00|
11|Halt|0|0|0||00|
12|Transaction|0|1|0||00|
13|VerifyCookie|0|1|0||00|
14|Transaction|1|1|0||00|
15|VerifyCookie|1|0|0||00|
16|TableLock|0|2|1|episodes|00|
17|Goto|0|2|0||00|

1、初始狀態(Initial State)
  當一個數據庫連接第一次打開時,狀態如圖所示。圖中最右邊(“Disk”標注)表示保存在存儲設備中的內容。每個方框代表一個扇區。藍色的塊表示這個扇區保存了原始數據。圖中中間區域是操作系統的磁盤緩沖區。開始的時候,這些緩存是還沒有被使用,因此這些方框是空白的。圖中左邊區域顯示SQLite用戶進程的內存。因為這個數據庫連接剛剛打開,所以還沒有任何數據記錄被讀入,所以這些內存也是空的。

 

2、獲取讀鎖(Acquiring A Read Lock)
  在SQLite寫數據庫之前,它必須先從數據庫中讀取相關信息。比如,在插入新的數據時,SQLite會先從sqlite_master表中讀取數據庫模式(相當於數據字典),以便編譯器對INSERT語句進行分析,確定數據插入的位置。
在進行讀操作之前,必須先獲取數據庫的共享鎖(shared lock),共享鎖允許兩個或更多的連接在同一時刻讀取數據庫。但是共享鎖不允許其它連接對數據庫進行寫操作。
  shared lock存在於操作系統磁盤緩存,而不是磁盤本身。文件鎖的本質只是操作系統的內核數據結構,當操作系統崩潰或掉電時,這些內核數據也會隨之消失。


3、讀取數據
  一旦得到shared lock,就可以進行讀操作。如圖所示,數據先由OS從磁盤讀取到OS緩存,然后再由OS移到用戶進程空間。一般來說,數據庫文件分為很多頁,而一次讀操作只讀取一小部分頁面。如圖,從8個頁面讀取3個頁面。

4、獲取Reserved Lock
  在對數據進行修改操作之前,先要獲取數據庫文件的Reserved Lock,Reserved Lock和shared lock的相似之處在於,它們都允許其它進程對數據庫文件進行讀操作。Reserved Lock和Shared Lock可以共存,但是只能是一個Reserved Lock和多個Shared Lock——多個Reserved Lock不能共存。所以,在同一時刻,只能進行一個寫操作。
  Reserved Lock意味着當前進程(連接)想修改數據庫文件,但是還沒開始修改操作,所以其它的進程可以讀數據庫,但不能寫數據庫。

5、創建恢復日志(Creating A Rollback Journal File)
  在對數據庫進行寫操作之前,SQLite先要創建一個單獨的日志文件,然后把要修改的頁面的原始數據寫入日志。回滾日志包含一個日志頭(圖中的綠色)——記錄數據庫文件的原始大小。所以即使數據庫文件大小改變了,我們仍知道數據庫的原始大小。
  從OS的角度來看,當一個文件創建時,大多數OS(Windows、Linux、Mac OS X)不會向磁盤寫入數據,新創建的文件此時位於磁盤緩存中,之后才會真正寫入磁盤。如圖,日志文件位於OS磁盤緩存中,而不是位於磁盤。

  以上5步的實現代碼:

//事務指令的實現
//p1為數據庫文件的索引號--0為main database;1為temporary tables使用的文件
//p2不為0,一個寫事務開始
case OP_Transaction: {
  //數據庫的索引號
  int i = pOp->p1;
  //指向數據庫對應的btree
  Btree *pBt;
  assert( i>=0 && i<db->nDb );
  assert( (p->btreeMask & (1<<i))!=0 );
  //設置btree指針
  pBt = db->aDb[i].pBt;
  if( pBt ){
    //從這里btree開始事務,主要給文件加鎖,並設置btree事務狀態
    rc = sqlite3BtreeBeginTrans(pBt, pOp->p2);
    
    if( rc==SQLITE_BUSY ){
      p->pc = pc;
      p->rc = rc = SQLITE_BUSY;
      goto vdbe_return;
    }
    if( rc!=SQLITE_OK && rc!=SQLITE_READONLY /* && rc!=SQLITE_BUSY */ ){
      goto abort_due_to_error;
    }
  }
  break;
}

//開始一個事務,如果第二個參數不為0,則一個寫事務開始,否則是一個讀事務
//如果wrflag>=2,一個exclusive事務開始,此時別的連接不能訪問數據庫
int sqlite3BtreeBeginTrans(Btree *p, int wrflag){
  BtShared *pBt = p->pBt;
  int rc = SQLITE_OK;
  btreeIntegrity(p);
  /* If the btree is already in a write-transaction, or it
  ** is already in a read-transaction and a read-transaction
  ** is requested, this is a no-op.
  */
  //如果b-tree處於一個寫事務;或者處於一個讀事務,一個讀事務又請求,則返回SQLITE_OK
  if( p->inTrans==TRANS_WRITE || (p->inTrans==TRANS_READ && !wrflag) ){
    return SQLITE_OK;
  }
  /* Write transactions are not possible on a read-only database */
  //寫事務不能訪問只讀數據庫
  if( pBt->readOnly && wrflag ){
    return SQLITE_READONLY;
  }
  /* If another database handle has already opened a write transaction 
  ** on this shared-btree structure and a second write transaction is
  ** requested, return SQLITE_BUSY.
  */
  //如果數據庫已存在一個寫事務,則該寫事務請求時返回SQLITE_BUSY
  if( pBt->inTransaction==TRANS_WRITE && wrflag ){
    return SQLITE_BUSY;
  }
  do {
      //如果數據庫對應btree的第一個頁面還沒讀進內存
      //則把該頁面讀進內存,數據庫也相應的加read lock
    if( pBt->pPage1==0 ){
      //加read lock,並讀頁面到內存
      rc = lockBtree(pBt);
    }
    if( rc==SQLITE_OK && wrflag ){
      //對數據庫文件加RESERVED_LOCK鎖
      rc = sqlite3pager_begin(pBt->pPage1->aData, wrflag>1);
      if( rc==SQLITE_OK ){
        rc = newDatabase(pBt);
      }
    }
    if( rc==SQLITE_OK ){
      if( wrflag ) pBt->inStmt = 0;
    }else{
      unlockBtreeIfUnused(pBt);
    }
  }while( rc==SQLITE_BUSY && pBt->inTransaction==TRANS_NONE && sqlite3InvokeBusyHandler(pBt->pBusyHandler) );
  if( rc==SQLITE_OK ){
    if( p->inTrans==TRANS_NONE ){
      //btree的事務數加1
      pBt->nTransaction++;
    }
    //設置btree事務狀態
    p->inTrans = (wrflag?TRANS_WRITE:TRANS_READ);
    if( p->inTrans>pBt->inTransaction ){
      pBt->inTransaction = p->inTrans;
    }
  }
  btreeIntegrity(p);
  return rc;
}
/* **獲取數據庫的寫鎖,發生以下情況時去除寫鎖: ** * sqlite3pager_commit() is called. ** * sqlite3pager_rollback() is called. ** * sqlite3pager_close() is called. ** * sqlite3pager_unref() is called to on every outstanding page. **pData指向數據庫的打開的頁面,此時並不修改,僅僅只是獲取 **相應的pager,檢查它是否處於read-lock狀態 **如果打開的不是臨時文件,則打開日志文件. **如果數據庫已經處於寫狀態,則do nothing */ int sqlite3pager_begin(void *pData, int exFlag){ PgHdr *pPg = DATA_TO_PGHDR(pData); Pager *pPager = pPg->pPager; int rc = SQLITE_OK; assert( pPg->nRef>0 ); assert( pPager->state!=PAGER_UNLOCK ); //pager已經處於share狀態 if( pPager->state==PAGER_SHARED ){ assert( pPager->aInJournal==0 ); if( MEMDB ){ pPager->state = PAGER_EXCLUSIVE; pPager->origDbSize = pPager->dbSize; }else{ //對文件加 RESERVED_LOCK rc = sqlite3OsLock(pPager->fd, RESERVED_LOCK); if( rc==SQLITE_OK ){ //設置pager的狀態 pPager->state = PAGER_RESERVED; if( exFlag ){ rc = pager_wait_on_lock(pPager, EXCLUSIVE_LOCK); } } if( rc!=SQLITE_OK ){ return rc; } pPager->dirtyCache = 0; TRACE2("TRANSACTION %d\n", PAGERID(pPager)); //使用日志,不是臨時文件,則打開日志文件 if( pPager->useJournal && !pPager->tempFile ){ //為pager打開日志文件,pager應該處於RESERVED或EXCLUSIVE狀態 //會向日志文件寫入header rc = pager_open_journal(pPager); } } } return rc; }
//創建日志文件,pager應該處於RESERVED或EXCLUSIVE狀態 static int pager_open_journal(Pager *pPager){ int rc; assert( !MEMDB ); assert( pPager->state>=PAGER_RESERVED ); assert( pPager->journalOpen==0 ); assert( pPager->useJournal ); assert( pPager->aInJournal==0 ); sqlite3pager_pagecount(pPager); //日志文件頁面位圖 pPager->aInJournal = sqliteMalloc( pPager->dbSize/8 + 1 ); if( pPager->aInJournal==0 ){ rc = SQLITE_NOMEM; goto failed_to_open_journal; } //打開日志文件 rc = sqlite3OsOpenExclusive(pPager->zJournal, &pPager->jfd, pPager->tempFile); //日志文件的位置指針 pPager->journalOff = 0; pPager->setMaster = 0; pPager->journalHdr = 0; if( rc!=SQLITE_OK ){ goto failed_to_open_journal; } /*一般來說,OS此時創建的文件位於磁盤緩存,並沒有實際 **存在於磁盤,下面三個操作就是為了把結果寫入磁盤,而對於 **windows系統來說,並沒有提供相應API,所以實際上沒有意義. */ //fullSync操作對windows沒有意義 sqlite3OsSetFullSync(pPager->jfd, pPager->full_fsync); sqlite3OsSetFullSync(pPager->fd, pPager->full_fsync); /* Attempt to open a file descriptor for the directory that contains a file. **This file descriptor can be used to fsync() the directory **in order to make sure the creation of a new file is actually written to disk. */ sqlite3OsOpenDirectory(pPager->jfd, pPager->zDirectory); pPager->journalOpen = 1; pPager->journalStarted = 0; pPager->needSync = 0; pPager->alwaysRollback = 0; pPager->nRec = 0; if( pPager->errCode ){ rc = pPager->errCode; goto failed_to_open_journal; } pPager->origDbSize = pPager->dbSize; //寫入日志文件的header--24個字節 rc = writeJournalHdr(pPager); if( pPager->stmtAutoopen && rc==SQLITE_OK ){ rc = sqlite3pager_stmt_begin(pPager); } if( rc!=SQLITE_OK && rc!=SQLITE_NOMEM ){ rc = pager_unwritelock(pPager); if( rc==SQLITE_OK ){ rc = SQLITE_FULL; } } return rc; failed_to_open_journal: sqliteFree(pPager->aInJournal); pPager->aInJournal = 0; if( rc==SQLITE_NOMEM ){ /* If this was a malloc() failure, then we will not be closing the pager ** file. So delete any journal file we may have just created. Otherwise, ** the system will get confused, we have a read-lock on the file and a ** mysterious journal has appeared in the filesystem. */ sqlite3OsDelete(pPager->zJournal); }else{ sqlite3OsUnlock(pPager->fd, NO_LOCK); pPager->state = PAGER_UNLOCK; } return rc; } /*寫入日志文件頭 **journal header的格式如下: ** - 8 bytes: 標志日志文件的魔數 ** - 4 bytes: 日志文件中記錄數 ** - 4 bytes: Random number used for page hash. ** - 4 bytes: 原來數據庫的大小(kb) ** - 4 bytes: 扇區大小512byte */ static int writeJournalHdr(Pager *pPager){ //日志文件頭 char zHeader[sizeof(aJournalMagic)+16]; int rc = seekJournalHdr(pPager); if( rc ) return rc; pPager->journalHdr = pPager->journalOff; if( pPager->stmtHdrOff==0 ){ pPager->stmtHdrOff = pPager->journalHdr; } //設置文件指針指向header之后 pPager->journalOff += JOURNAL_HDR_SZ(pPager); /* FIX ME: ** ** Possibly for a pager not in no-sync mode, the journal magic should not ** be written until nRec is filled in as part of next syncJournal(). ** ** Actually maybe the whole journal header should be delayed until that ** point. Think about this. */ memcpy(zHeader, aJournalMagic, sizeof(aJournalMagic)); /* The nRec Field. 0xFFFFFFFF for no-sync journals. */ put32bits(&zHeader[sizeof(aJournalMagic)], pPager->noSync ? 0xffffffff : 0); /* The random check-hash initialiser */ sqlite3Randomness(sizeof(pPager->cksumInit), &pPager->cksumInit); put32bits(&zHeader[sizeof(aJournalMagic)+4], pPager->cksumInit); /* The initial database size */ put32bits(&zHeader[sizeof(aJournalMagic)+8], pPager->dbSize); /* The assumed sector size for this process */ put32bits(&zHeader[sizeof(aJournalMagic)+12], pPager->sectorSize); //寫入文件頭 rc = sqlite3OsWrite(pPager->jfd, zHeader, sizeof(zHeader)); /* The journal header has been written successfully. Seek the journal ** file descriptor to the end of the journal header sector. */ if( rc==SQLITE_OK ){ rc = sqlite3OsSeek(pPager->jfd, pPager->journalOff-1); if( rc==SQLITE_OK ){ rc = sqlite3OsWrite(pPager->jfd, "\000", 1); } } return rc; }

  其實現過程如下圖所示:

6、修改位於用戶進程空間的頁面(Changing Database Pages In User Space)
  頁面的原始數據寫入日志之后,就可以修改頁面了——位於用戶進程空間。每個數據庫連接都有自己私有的空間,所以頁面的變化只對該連接可見,而對其它連接的數據仍然是磁盤緩存中的數據。從這里可以明白一件事:一個進程在修改頁面數據的同時,其它進程可以繼續進行讀操作。圖中的紅色表示修改的頁面。

7、日志文件刷入磁盤(Flushing The Rollback Journal File To Mass Storage)
  接下來把日志文件的內容刷入磁盤,這對於數據庫從意外中恢復來說是至關重要的一步。而且這通常也是一個耗時的操作,因為磁盤I/O速度很慢。
  這個步驟不只把日志文件刷入磁盤那么簡單,它的實現實際上分成兩步:首先把日志文件的內容刷入磁盤(即頁面數據);然后把日志文件中頁面的數目寫入日志文件頭,再把header刷入磁盤(這一過程在代碼中清晰可見)。

  代碼如下:

/*
**Sync日志文件,保證所有的臟頁面寫入磁盤日志文件
*/
static int syncJournal(Pager *pPager){
  PgHdr *pPg;
  int rc = SQLITE_OK;

  /* Sync the journal before modifying the main database
  ** (assuming there is a journal and it needs to be synced.)
  */
  if( pPager->needSync ){
    if( !pPager->tempFile ){
      assert( pPager->journalOpen );
      /* assert( !pPager->noSync ); // noSync might be set if synchronous
      ** was turned off after the transaction was started.  Ticket #615 */
#ifndef NDEBUG
      {
        /* Make sure the pPager->nRec counter we are keeping agrees
        ** with the nRec computed from the size of the journal file.
        */
        i64 jSz;
        rc = sqlite3OsFileSize(pPager->jfd, &jSz);
        if( rc!=0 ) return rc;
        assert( pPager->journalOff==jSz );
      }
#endif
      {
        /* Write the nRec value into the journal file header. If in
        ** full-synchronous mode, sync the journal first. This ensures that
        ** all data has really hit the disk before nRec is updated to mark
        ** it as a candidate for rollback. 
        */
        if( pPager->fullSync ){
          TRACE2("SYNC journal of %d\n", PAGERID(pPager));
        //首先保證臟頁面中所有的數據都已經寫入日志文件
          rc = sqlite3OsSync(pPager->jfd, 0);
          if( rc!=0 ) return rc;
        }
        rc = sqlite3OsSeek(pPager->jfd,
                           pPager->journalHdr + sizeof(aJournalMagic));
        if( rc ) return rc;
       //頁面的數目寫入日志文件
        rc = write32bits(pPager->jfd, pPager->nRec);
        if( rc ) return rc;

        rc = sqlite3OsSeek(pPager->jfd, pPager->journalOff);
        if( rc ) return rc;
      }
      TRACE2("SYNC journal of %d\n", PAGERID(pPager));
      rc = sqlite3OsSync(pPager->jfd, pPager->full_fsync);
      if( rc!=0 ) return rc;
      pPager->journalStarted = 1;
    }
    pPager->needSync = 0;

    /* Erase the needSync flag from every page.
    */
    //清除needSync標志位
    for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){
      pPg->needSync = 0;
    }
    pPager->pFirstSynced = pPager->pFirst;
  }

#ifndef NDEBUG
  /* If the Pager.needSync flag is clear then the PgHdr.needSync
  ** flag must also be clear for all pages.  Verify that this
  ** invariant is true.
  */
  else{
    for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){
      assert( pPg->needSync==0 );
    }
    assert( pPager->pFirstSynced==pPager->pFirst );
  }
#endif
  return rc;
}

8、獲取排斥鎖(Obtaining An Exclusive Lock)
  在對數據庫文件進行修改之前(注:這里不是內存中的頁面),我們必須得到數據庫文件的排斥鎖(Exclusive Lock)。得到排斥鎖的過程可分為兩步:首先得到Pending lock;然后Pending lock升級到exclusive lock。
  Pending lock允許其它已經存在的Shared lock繼續讀數據庫文件,但是不允許產生新的shared lock,這樣做目的是為了防止寫操作發生餓死情況。一旦所有的shared lock完成操作,則pending lock升級到exclusive lock。

9、修改的頁面寫入文件(Writing Changes To The Database File)
  一旦得到exclusive lock,其它的進程就不能進行讀操作,此時就可以把修改的頁面寫回數據庫文件,但是通常OS都把結果暫時保存到磁盤緩存中,直到某個時刻才會真正把結果寫入磁盤。

  以上2步的實現代碼:

//把所有的臟頁面寫入數據庫
//到這里開始獲取EXCLUSIVEQ鎖,並將頁面寫回操作系統文件
static int pager_write_pagelist(PgHdr *pList){
  Pager *pPager;
  int rc;

  if( pList==0 ) return SQLITE_OK;
  pPager = pList->pPager;

  /* At this point there may be either a RESERVED or EXCLUSIVE lock on the
  ** database file. If there is already an EXCLUSIVE lock, the following
  ** calls to sqlite3OsLock() are no-ops.
  **
  ** Moving the lock from RESERVED to EXCLUSIVE actually involves going
  ** through an intermediate state PENDING.   A PENDING lock prevents new
  ** readers from attaching to the database but is unsufficient for us to
  ** write.  The idea of a PENDING lock is to prevent new readers from
  ** coming in while we wait for existing readers to clear.
  **
  ** While the pager is in the RESERVED state, the original database file
  ** is unchanged and we can rollback without having to playback the
  ** journal into the original database file.  Once we transition to
  ** EXCLUSIVE, it means the database file has been changed and any rollback
  ** will require a journal playback.
  */
  //加EXCLUSIVE_LOCK鎖
  rc = pager_wait_on_lock(pPager, EXCLUSIVE_LOCK);
  if( rc!=SQLITE_OK ){
    return rc;
  }

  while( pList ){
    assert( pList->dirty );
    rc = sqlite3OsSeek(pPager->fd, (pList->pgno-1)*(i64)pPager->pageSize);
    if( rc ) return rc;
    /* If there are dirty pages in the page cache with page numbers greater
    ** than Pager.dbSize, this means sqlite3pager_truncate() was called to
    ** make the file smaller (presumably by auto-vacuum code). Do not write
    ** any such pages to the file.
    */
    if( pList->pgno<=pPager->dbSize ){
      char *pData = CODEC2(pPager, PGHDR_TO_DATA(pList), pList->pgno, 6);
      TRACE3("STORE %d page %d\n", PAGERID(pPager), pList->pgno);
      //寫入文件
      rc = sqlite3OsWrite(pPager->fd, pData, pPager->pageSize);
      TEST_INCR(pPager->nWrite);
    }
#ifndef NDEBUG
    else{
      TRACE3("NOSTORE %d page %d\n", PAGERID(pPager), pList->pgno);
    }
#endif
    if( rc ) return rc;
    //設置dirty
    pList->dirty = 0;
#ifdef SQLITE_CHECK_PAGES
    pList->pageHash = pager_pagehash(pList);
#endif
  //指向下一個臟頁面
    pList = pList->pDirty;
  }
  return SQLITE_OK;
}

10、修改結果刷入存儲設備(Flushing Changes To Mass Storage)
  為了保證修改結果真正寫入磁盤,這一步必不可少。對於數據庫存的完整性,這一步也是關鍵的一步。由於要進行實際的I/O操作,所以和第7步一樣,將花費較多的時間。

  以上幾步實現代碼如下(以上幾步是在函數sqlite3BtreeSync()--btree.c中調用的)

//同步btree對應的數據庫文件
//該函數返回之后,只需要提交寫事務,刪除日志文件
int sqlite3BtreeSync(Btree *p, const char *zMaster){
  int rc = SQLITE_OK;
  if( p->inTrans==TRANS_WRITE ){
    BtShared *pBt = p->pBt;
    Pgno nTrunc = 0;
#ifndef SQLITE_OMIT_AUTOVACUUM
    if( pBt->autoVacuum ){
      rc = autoVacuumCommit(pBt, &nTrunc); 
      if( rc!=SQLITE_OK ){
        return rc;
      }
    }
#endif

   //調用pager進行sync
    rc = sqlite3pager_sync(pBt->pPager, zMaster, nTrunc);
  }
  return rc;
}

//把pager所有臟頁面寫回文件
int sqlite3pager_sync(Pager *pPager, const char *zMaster, Pgno nTrunc){
  int rc = SQLITE_OK;

  TRACE4("DATABASE SYNC: File=%s zMaster=%s nTrunc=%d\n", 
      pPager->zFilename, zMaster, nTrunc);

  /* If this is an in-memory db, or no pages have been written to, or this
  ** function has already been called, it is a no-op.
  */
  //pager不處於PAGER_SYNCED狀態,dirtyCache為1,
  //則進行sync操作
  if( pPager->state!=PAGER_SYNCED && !MEMDB && pPager->dirtyCache ){
    PgHdr *pPg;
    assert( pPager->journalOpen );

    /* If a master journal file name has already been written to the
    ** journal file, then no sync is required. This happens when it is
    ** written, then the process fails to upgrade from a RESERVED to an
    ** EXCLUSIVE lock. The next time the process tries to commit the
    ** transaction the m-j name will have already been written.
    */
    if( !pPager->setMaster ){
      //pager修改計數
      rc = pager_incr_changecounter(pPager);
      if( rc!=SQLITE_OK ) goto sync_exit;
#ifndef SQLITE_OMIT_AUTOVACUUM
      if( nTrunc!=0 ){
        /* If this transaction has made the database smaller, then all pages
        ** being discarded by the truncation must be written to the journal
        ** file.
        */
        Pgno i;
        void *pPage;
        int iSkip = PAGER_MJ_PGNO(pPager);
        for( i=nTrunc+1; i<=pPager->origDbSize; i++ ){
          if( !(pPager->aInJournal[i/8] & (1<<(i&7))) && i!=iSkip ){
            rc = sqlite3pager_get(pPager, i, &pPage);
            if( rc!=SQLITE_OK ) goto sync_exit;
            rc = sqlite3pager_write(pPage);
            sqlite3pager_unref(pPage);
            if( rc!=SQLITE_OK ) goto sync_exit;
          }
        } 
      }
#endif
      rc = writeMasterJournal(pPager, zMaster);
      if( rc!=SQLITE_OK ) goto sync_exit;
      
      //sync日志文件
      rc = syncJournal(pPager);
      if( rc!=SQLITE_OK ) goto sync_exit;
    }

#ifndef SQLITE_OMIT_AUTOVACUUM
    if( nTrunc!=0 ){
      rc = sqlite3pager_truncate(pPager, nTrunc);
      if( rc!=SQLITE_OK ) goto sync_exit;
    }
#endif

    /* Write all dirty pages to the database file */
    pPg = pager_get_all_dirty_pages(pPager);


   //把所有臟頁面寫回操作系統文件
    rc = pager_write_pagelist(pPg);
    if( rc!=SQLITE_OK ) goto sync_exit;

    /* Sync the database file. */
    //sync數據庫文件
    if( !pPager->noSync ){
      rc = sqlite3OsSync(pPager->fd, 0);
    }

    pPager->state = PAGER_SYNCED;
  }else if( MEMDB && nTrunc!=0 ){
    rc = sqlite3pager_truncate(pPager, nTrunc);
  }

sync_exit:
  return rc;
}

  接下來的過程如下圖所示:

 

11、刪除日志文件(Deleting The Rollback Journal)
  一旦更改寫入設備,日志文件將會被刪除,這是事務真正提交的時刻。如果在這之前系統發生崩潰,就會進行恢復處理,使得數據庫和沒發生改變一樣;如果在這之后系統發生崩潰,表明所有的更改都已經寫入磁盤。SQLite就是根據日志存在情況決定是否對數據庫進行恢復處理。刪除文件本質上不是一個原子操作,但是從用戶進程的角度來看是一個原子操作,所以一個事務看起來是一個原子操作。
  在許多系統中,刪除文件也是一個高代價的操作。作為優化,SQLite可以配置成把日志文件的長度截為0或者把日志文件頭清零。

12、釋放鎖(Releasing The Lock)
  作為原子提交的最后一步,釋放排斥鎖使得其它進程可以開始訪問數據庫。
  下圖中,我們指明了當鎖被釋放的時候用戶空間所擁有的信息已經被清空了。對於老版本的SQLite可以這么認為,但最新的SQLite會保存些用戶空間的緩存不會被清空,可能下一個事務開始的時候,這些數據剛好可以用上。重新利用這些內存要比再次從操作系統磁盤緩存或者硬盤中讀取輕松和快捷得多。在再次使用這些數據之前,我們必須先取得一個共享鎖,同時我們還不得不去檢查一下,保證還沒有其他進程在我們擁有共享鎖之前對數據庫文件進行了修改。數據庫文件的第一頁中有一個計數器,數據庫文件每做一次修改,這個計數器就會增長一下。我們可以通過檢查這個計數器就可得知是否有其他進程修改過數據庫文件。如果數據庫文件已經被修改過了,那么用戶內存空間的緩存就不得不清空,並重新讀入。大多數情況下,這種情況不大會發生,因此用戶空間的內存緩存將是有效的,這對於性能提高來說作用是顯著的。

  以上2步(以上2步是在sqlite3BtreeCommit()--btree.c函數中實現的)代碼如下:

//提交事務,至此一個事務完成.主要做兩件事:
//刪除日志文件,釋放數據庫文件的寫鎖
int sqlite3BtreeCommit(Btree *p){
  BtShared *pBt = p->pBt;
  btreeIntegrity(p);
  /* If the handle has a write-transaction open, commit the shared-btrees 
  ** transaction and set the shared state to TRANS_READ.
  */
  if( p->inTrans==TRANS_WRITE ){
    int rc;
    assert( pBt->inTransaction==TRANS_WRITE );
    assert( pBt->nTransaction>0 );

    //調用pager,提交事務
    rc = sqlite3pager_commit(pBt->pPager);
    if( rc!=SQLITE_OK ){
      return rc;
    }
    pBt->inTransaction = TRANS_READ;
    pBt->inStmt = 0;
  }
  unlockAllTables(p);

  /* If the handle has any kind of transaction open, decrement the transaction
  ** count of the shared btree. If the transaction count reaches 0, set
  ** the shared state to TRANS_NONE. The unlockBtreeIfUnused() call below
  ** will unlock the pager.
  */
  if( p->inTrans!=TRANS_NONE ){
    pBt->nTransaction--;
    if( 0==pBt->nTransaction ){
      pBt->inTransaction = TRANS_NONE;
    }
  }
}

//提交事務,主要調用pager_unwritelock()函數
int sqlite3pager_commit(Pager *pPager){
  int rc;
  PgHdr *pPg;

  if( pPager->errCode ){
    return pPager->errCode;
  }
  if( pPager->state<PAGER_RESERVED ){
    return SQLITE_ERROR;
  }
  TRACE2("COMMIT %d\n", PAGERID(pPager));
  if( MEMDB ){
    pPg = pager_get_all_dirty_pages(pPager);
    while( pPg ){
      clearHistory(PGHDR_TO_HIST(pPg, pPager));
      pPg->dirty = 0;
      pPg->inJournal = 0;
      pPg->inStmt = 0;
      pPg->needSync = 0;
      pPg->pPrevStmt = pPg->pNextStmt = 0;
      pPg = pPg->pDirty;
    }
    pPager->pDirty = 0;
#ifndef NDEBUG
    for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){
      PgHistory *pHist = PGHDR_TO_HIST(pPg, pPager);
      assert( !pPg->alwaysRollback );
      assert( !pHist->pOrig );
      assert( !pHist->pStmt );
    }
#endif
    pPager->pStmt = 0;
    pPager->state = PAGER_SHARED;
    return SQLITE_OK;
  }
  if( pPager->dirtyCache==0 ){
    /* Exit early (without doing the time-consuming sqlite3OsSync() calls)
    ** if there have been no changes to the database file. */
    assert( pPager->needSync==0 );
    rc = pager_unwritelock(pPager);
    pPager->dbSize = -1;
    return rc;
  }
  assert( pPager->journalOpen );
  rc = sqlite3pager_sync(pPager, 0, 0);
  
  //刪除文件,釋放寫鎖
  if( rc==SQLITE_OK ){
    rc = pager_unwritelock(pPager);
    pPager->dbSize = -1;
  }
  return rc;
}

//對數據庫加read lock,刪除日志文件
static int pager_unwritelock(Pager *pPager){
  PgHdr *pPg;
  int rc;
  assert( !MEMDB );
  if( pPager->state<PAGER_RESERVED ){
    return SQLITE_OK;
  }
  sqlite3pager_stmt_commit(pPager);
  if( pPager->stmtOpen ){
    sqlite3OsClose(&pPager->stfd);
    pPager->stmtOpen = 0;
  }
  if( pPager->journalOpen ){

    //關閉日志文件
    sqlite3OsClose(&pPager->jfd);
    pPager->journalOpen = 0;
    //刪除日志文件
    sqlite3OsDelete(pPager->zJournal);
    sqliteFree( pPager->aInJournal );
    pPager->aInJournal = 0;
    for(pPg=pPager->pAll; pPg; pPg=pPg->pNextAll){
      pPg->inJournal = 0;
      pPg->dirty = 0;
      pPg->needSync = 0;
#ifdef SQLITE_CHECK_PAGES
      pPg->pageHash = pager_pagehash(pPg);
#endif
    }
    pPager->pDirty = 0;
    pPager->dirtyCache = 0;
    pPager->nRec = 0;
  }else{
    assert( pPager->aInJournal==0 );
    assert( pPager->dirtyCache==0 || pPager->useJournal==0 );
  }

  //釋放寫鎖,加讀鎖
  rc = sqlite3OsUnlock(pPager->fd, SHARED_LOCK);
  pPager->state = PAGER_SHARED;
  pPager->origDbSize = 0;
  pPager->setMaster = 0;
  pPager->needSync = 0;
  pPager->pFirstSynced = pPager->pFirst;
  return rc;
}

  下圖可進一步描述該過程:

其中sqlite3BtreeSync()和sqlite3BtreeCommit()是如何被調用的?

  一般來說,事務提交方式為自動提交的話,在虛擬機中的OP_Halt指令實現提交事務,相關代碼如下:

//虛擬機停機指令
case OP_Halt: {            /* no-push */
  p->pTos = pTos;
  p->rc = pOp->p1;
  p->pc = pc;
  p->errorAction = pOp->p2;
  if( pOp->p3 ){
    sqlite3SetString(&p->zErrMsg, pOp->p3, (char*)0);
  }
  //設置虛擬機狀態SQLITE_MAGIC_RUN 為 SQLITE_MAGIC_HALT,
  //並提交事務
  rc = sqlite3VdbeHalt(p);
  assert( rc==SQLITE_BUSY || rc==SQLITE_OK );
  if( rc==SQLITE_BUSY ){
    p->rc = SQLITE_BUSY;
    return SQLITE_BUSY;
  }
  return p->rc ? SQLITE_ERROR : SQLITE_DONE;
}

//當虛擬機要停機時,調用該函數,如果VDBE改變了數據庫且為自動
//提交模式,則提交這些改變
int sqlite3VdbeHalt(Vdbe *p){
  sqlite3 *db = p->db;
  int i;
  int (*xFunc)(Btree *pBt) = 0;  /* Function to call on each btree backend */
  int isSpecialError;            /* Set to true if SQLITE_NOMEM or IOERR */

  /* This function contains the logic that determines if a statement or
  ** transaction will be committed or rolled back as a result of the
  ** execution of this virtual machine. 
  **
  ** Special errors:
  **
  **     If an SQLITE_NOMEM error has occured in a statement that writes to
  **     the database, then either a statement or transaction must be rolled
  **     back to ensure the tree-structures are in a consistent state. A
  **     statement transaction is rolled back if one is open, otherwise the
  **     entire transaction must be rolled back.
  **
  **     If an SQLITE_IOERR error has occured in a statement that writes to
  **     the database, then the entire transaction must be rolled back. The
  **     I/O error may have caused garbage to be written to the journal 
  **     file. Were the transaction to continue and eventually be rolled 
  **     back that garbage might end up in the database file.
  **     
  **     In both of the above cases, the Vdbe.errorAction variable is 
  **     ignored. If the sqlite3.autoCommit flag is false and a transaction
  **     is rolled back, it will be set to true.
  **
  ** Other errors:
  **
  ** No error:
  **
  */

  if( sqlite3MallocFailed() ){
    p->rc = SQLITE_NOMEM;
  }
  if( p->magic!=VDBE_MAGIC_RUN ){
    /* Already halted.  Nothing to do. */
    assert( p->magic==VDBE_MAGIC_HALT );
    return SQLITE_OK;
  }
  //釋放虛擬機中所有的游標
  closeAllCursors(p);
  checkActiveVdbeCnt(db);

  /* No commit or rollback needed if the program never started */
  if( p->pc>=0 ){

    /* Check for one of the special errors - SQLITE_NOMEM or SQLITE_IOERR */
    isSpecialError = ((p->rc==SQLITE_NOMEM || p->rc==SQLITE_IOERR)?1:0);
    if( isSpecialError ){
      /* This loop does static analysis of the query to see which of the
      ** following three categories it falls into:
      **
      **     Read-only
      **     Query with statement journal
      **     Query without statement journal
      **
      ** We could do something more elegant than this static analysis (i.e.
      ** store the type of query as part of the compliation phase), but 
      ** handling malloc() or IO failure is a fairly obscure edge case so 
      ** this is probably easier. Todo: Might be an opportunity to reduce 
      ** code size a very small amount though
      */
      int isReadOnly = 1;
      int isStatement = 0;
      assert(p->aOp || p->nOp==0);
      for(i=0; i<p->nOp; i++){ 
        switch( p->aOp[i].opcode ){
          case OP_Transaction:
            isReadOnly = 0;
            break;
          case OP_Statement:
            isStatement = 1;
            break;
        }
      }
  
      /* If the query was read-only, we need do no rollback at all. Otherwise,
      ** proceed with the special handling.
      */
      if( !isReadOnly ){
        if( p->rc==SQLITE_NOMEM && isStatement ){
          xFunc = sqlite3BtreeRollbackStmt;
        }else{
          /* We are forced to roll back the active transaction. Before doing
          ** so, abort any other statements this handle currently has active.
          */
          sqlite3AbortOtherActiveVdbes(db, p);
          sqlite3RollbackAll(db);
          db->autoCommit = 1;
        }
      }
    }
  
    /* If the auto-commit flag is set and this is the only active vdbe, then
    ** we do either a commit or rollback of the current transaction. 
    **
    ** Note: This block also runs if one of the special errors handled 
    ** above has occured. 
    */
    //如果自動提交事務,則提交事務
    if( db->autoCommit && db->activeVdbeCnt==1 ){
      if( p->rc==SQLITE_OK || (p->errorAction==OE_Fail && !isSpecialError) ){
      /* The auto-commit flag is true, and the vdbe program was 
        ** successful or hit an 'OR FAIL' constraint. This means a commit 
        ** is required.
        */
        //提交事務
        int rc = vdbeCommit(db);
        if( rc==SQLITE_BUSY ){
          return SQLITE_BUSY;
        }else if( rc!=SQLITE_OK ){
          p->rc = rc;
          sqlite3RollbackAll(db);
        }else{
          sqlite3CommitInternalChanges(db);
        }
      }else{
        sqlite3RollbackAll(db);
      }
    }else if( !xFunc ){
      if( p->rc==SQLITE_OK || p->errorAction==OE_Fail ){
        xFunc = sqlite3BtreeCommitStmt;
      }else if( p->errorAction==OE_Abort ){
        xFunc = sqlite3BtreeRollbackStmt;
      }else{
        sqlite3AbortOtherActiveVdbes(db, p);
        sqlite3RollbackAll(db);
        db->autoCommit = 1;
      }
    }
  
    /* If xFunc is not NULL, then it is one of sqlite3BtreeRollbackStmt or
    ** sqlite3BtreeCommitStmt. Call it once on each backend. If an error occurs
    ** and the return code is still SQLITE_OK, set the return code to the new
    ** error value.
    */
    assert(!xFunc ||
      xFunc==sqlite3BtreeCommitStmt ||
      xFunc==sqlite3BtreeRollbackStmt
    );
    for(i=0; xFunc && i<db->nDb; i++){ 
      int rc;
      Btree *pBt = db->aDb[i].pBt;
      if( pBt ){
        rc = xFunc(pBt);
        if( rc && (p->rc==SQLITE_OK || p->rc==SQLITE_CONSTRAINT) ){
          p->rc = rc;
          sqlite3SetString(&p->zErrMsg, 0);
        }
      }
    }
  
    /* If this was an INSERT, UPDATE or DELETE and the statement was committed, 
    ** set the change counter. 
    */
    if( p->changeCntOn && p->pc>=0 ){
      if( !xFunc || xFunc==sqlite3BtreeCommitStmt ){
        sqlite3VdbeSetChanges(db, p->nChange);
      }else{
        sqlite3VdbeSetChanges(db, 0);
      }
      p->nChange = 0;
    }
  
    /* Rollback or commit any schema changes that occurred. */
    if( p->rc!=SQLITE_OK && db->flags&SQLITE_InternChanges ){
      sqlite3ResetInternalSchema(db, 0);
      db->flags = (db->flags | SQLITE_InternChanges);
    }
  }

  /* We have successfully halted and closed the VM.  Record this fact. */
  if( p->pc>=0 ){
    db->activeVdbeCnt--;
  }
  p->magic = VDBE_MAGIC_HALT;
  checkActiveVdbeCnt(db);

  return SQLITE_OK;
}

//提交事務,主要調用:
//sqlite3BtreeSync()--同步btree, sqlite3BtreeCommit()---提交事務
static int vdbeCommit(sqlite3 *db){
  int i;
  int nTrans = 0;  /* Number of databases with an active write-transaction */
  int rc = SQLITE_OK;
  int needXcommit = 0;

  for(i=0; i<db->nDb; i++){ 
    Btree *pBt = db->aDb[i].pBt;
    if( pBt && sqlite3BtreeIsInTrans(pBt) ){
      needXcommit = 1;
      if( i!=1 ) nTrans++;
    }
  }

  /* If there are any write-transactions at all, invoke the commit hook */
  if( needXcommit && db->xCommitCallback ){
    sqlite3SafetyOff(db);
    rc = db->xCommitCallback(db->pCommitArg);
    sqlite3SafetyOn(db);
    if( rc ){
      return SQLITE_CONSTRAINT;
    }
  }

  /* The simple case - no more than one database file (not counting the
  ** TEMP database) has a transaction active.   There is no need for the
  ** master-journal.
  **
  ** If the return value of sqlite3BtreeGetFilename() is a zero length
  ** string, it means the main database is :memory:.  In that case we do
  ** not support atomic multi-file commits, so use the simple case then
  ** too.
  */
  //簡單的情況,只有一個數據庫文件,不需要master-journal
  if( 0==strlen(sqlite3BtreeGetFilename(db->aDb[0].pBt)) || nTrans<=1 ){
    for(i=0; rc==SQLITE_OK && i<db->nDb; i++){ 
      Btree *pBt = db->aDb[i].pBt;
      if( pBt ){
          //同步btree
        rc = sqlite3BtreeSync(pBt, 0);
      }
    }

    /* Do the commit only if all databases successfully synced */
    //commite事務
    if( rc==SQLITE_OK ){
      for(i=0; i<db->nDb; i++){
        Btree *pBt = db->aDb[i].pBt;
        if( pBt ){
          sqlite3BtreeCommit(pBt);
        }
      }
    }
  }

  /* The complex case - There is a multi-file write-transaction active.
  ** This requires a master journal file to ensure the transaction is
  ** committed atomicly.
  */
#ifndef SQLITE_OMIT_DISKIO
  else{
    int needSync = 0;
    char *zMaster = 0;   /* File-name for the master journal */
    char const *zMainFile = sqlite3BtreeGetFilename(db->aDb[0].pBt);
    OsFile *master = 0;

    /* Select a master journal file name */
    do {
      u32 random;
      sqliteFree(zMaster);
      sqlite3Randomness(sizeof(random), &random);
      zMaster = sqlite3MPrintf("%s-mj%08X", zMainFile, random&0x7fffffff);
      if( !zMaster ){
        return SQLITE_NOMEM;
      }
    }while( sqlite3OsFileExists(zMaster) );

    /* Open the master journal. */
    rc = sqlite3OsOpenExclusive(zMaster, &master, 0);
    if( rc!=SQLITE_OK ){
      sqliteFree(zMaster);
      return rc;
    }
 
    /* Write the name of each database file in the transaction into the new
    ** master journal file. If an error occurs at this point close
    ** and delete the master journal file. All the individual journal files
    ** still have 'null' as the master journal pointer, so they will roll
    ** back independently if a failure occurs.
    */
    for(i=0; i<db->nDb; i++){ 
      Btree *pBt = db->aDb[i].pBt;
      if( i==1 ) continue;   /* Ignore the TEMP database */
      if( pBt && sqlite3BtreeIsInTrans(pBt) ){
        char const *zFile = sqlite3BtreeGetJournalname(pBt);
        if( zFile[0]==0 ) continue;  /* Ignore :memory: databases */
        if( !needSync && !sqlite3BtreeSyncDisabled(pBt) ){
          needSync = 1;
        }
        rc = sqlite3OsWrite(master, zFile, strlen(zFile)+1);
        if( rc!=SQLITE_OK ){
          sqlite3OsClose(&master);
          sqlite3OsDelete(zMaster);
          sqliteFree(zMaster);
          return rc;
        }
      }
    }


    /* Sync the master journal file. Before doing this, open the directory
    ** the master journal file is store in so that it gets synced too.
    */
    zMainFile = sqlite3BtreeGetDirname(db->aDb[0].pBt);
    rc = sqlite3OsOpenDirectory(master, zMainFile);
    if( rc!=SQLITE_OK ||
          (needSync && (rc=sqlite3OsSync(master,0))!=SQLITE_OK) ){
      sqlite3OsClose(&master);
      sqlite3OsDelete(zMaster);
      sqliteFree(zMaster);
      return rc;
    }

    /* Sync all the db files involved in the transaction. The same call
    ** sets the master journal pointer in each individual journal. If
    ** an error occurs here, do not delete the master journal file.
    **
    ** If the error occurs during the first call to sqlite3BtreeSync(),
    ** then there is a chance that the master journal file will be
    ** orphaned. But we cannot delete it, in case the master journal
    ** file name was written into the journal file before the failure
    ** occured.
    */
    for(i=0; i<db->nDb; i++){ 
      Btree *pBt = db->aDb[i].pBt;
      if( pBt && sqlite3BtreeIsInTrans(pBt) ){
        rc = sqlite3BtreeSync(pBt, zMaster);
        if( rc!=SQLITE_OK ){
          sqlite3OsClose(&master);
          sqliteFree(zMaster);
          return rc;
        }
      }
    }
    sqlite3OsClose(&master);

    /* Delete the master journal file. This commits the transaction. After
    ** doing this the directory is synced again before any individual
    ** transaction files are deleted.
    */
    rc = sqlite3OsDelete(zMaster);
    assert( rc==SQLITE_OK );
    sqliteFree(zMaster);
    zMaster = 0;
    rc = sqlite3OsSyncDirectory(zMainFile);
    if( rc!=SQLITE_OK ){
      /* This is not good. The master journal file has been deleted, but
      ** the directory sync failed. There is no completely safe course of
      ** action from here. The individual journals contain the name of the
      ** master journal file, but there is no way of knowing if that
      ** master journal exists now or if it will exist after the operating
      ** system crash that may follow the fsync() failure.
      */
      return rc;
    }

    /* All files and directories have already been synced, so the following
    ** calls to sqlite3BtreeCommit() are only closing files and deleting
    ** journals. If something goes wrong while this is happening we don't
    ** really care. The integrity of the transaction is already guaranteed,
    ** but some stray 'cold' journals may be lying around. Returning an
    ** error code won't help matters.
    */
    for(i=0; i<db->nDb; i++){ 
      Btree *pBt = db->aDb[i].pBt;
      if( pBt ){
        sqlite3BtreeCommit(pBt);
      }
    }
  }
#endif

  return rc;
}

Page Cache之並發控制

  pager層是SQLite實現最為核心的模塊,它具有四大功能:I/O、頁面緩存、並發控制和日志恢復。而這些功能不僅是上層Btree的基礎,而且對系統的性能和健壯性有至關重要的影響。其中並發控制和日志恢復是事務處理實現的基礎。SQLite並發控制的機制非常簡單——即封鎖機制;另外,它的查詢優化機制也非常簡單——基於索引。這一切使得整個SQLite的實現變得簡單,同時變得很小,保證其運行速度非常快,所以特別適合嵌入式設備。SQLite是基於鎖來實現並發控制的,其鎖機制實現得非常簡單而巧妙。

  SQLite的並發控制機制是采用加鎖的方式,實現簡單,也非常巧妙,如下圖所示:

 

    

1、RESERVED LOCK
  RESERVED鎖意味着進程將要對數據庫進行寫操作。某一時刻只能有一個RESERVED Lock,但是RESERVED鎖和SHARED鎖可以共存,而且可以對數據庫加新的SHARED鎖。
  為什么要用RESERVED鎖?
  主要是出於並發性的考慮。由於SQLite只有庫級排斥鎖(EXCLUSIVE LOCK),如果寫事務一開始就上EXCLUSIVE鎖,然后再進行實際的數據更新,寫磁盤操作,這會使得並發性大大降低。而SQLite一旦得到數據庫的RESERVED鎖,就可以對緩存中的數據進行修改,而與此同時,其它進程可以繼續進行讀操作。直到真正需要寫磁盤時才對數據庫加EXCLUSIVE鎖。

2、PENDING LOCK
  PENDING LOCK意味着進程已經完成緩存中的數據修改,並想立即將更新寫入磁盤。它將等待此時已經存在的讀鎖事務完成,但是不允許對數據庫加新的SHARED LOCK(這與RESERVED LOCK相區別)。
  為什么要有PENDING LOCK?
  主要是為了防止出現寫餓死的情況。由於寫事務先要獲取RESERVED LOCK,所以可能一直產生新的SHARED LOCK,使得寫事務發生餓死的情況。

3、加鎖機制的具體實現

  SQLite在pager層獲取鎖的函數如下:

//獲取一個文件的鎖,如果忙則重復該操作,
//直到busy回調函數返回flase,或者成功獲得鎖
static int pager_wait_on_lock(Pager *pPager, int locktype){
  int rc;
  assert( PAGER_SHARED==SHARED_LOCK );
  assert( PAGER_RESERVED==RESERVED_LOCK );
  assert( PAGER_EXCLUSIVE==EXCLUSIVE_LOCK );
  if( pPager->state>=locktype ){
    rc = SQLITE_OK;
  }else{
    //重復直到獲得鎖
    do {
      rc = sqlite3OsLock(pPager->fd, locktype);
    }while( rc==SQLITE_BUSY && sqlite3InvokeBusyHandler(pPager->pBusyHandler) );
    
    if( rc==SQLITE_OK ){
        
      //設置pager的狀態
      pPager->state = locktype;
    }
  }
  return rc;
}

  Windows下具體的實現如下:

static int winLock(OsFile *id, int locktype){
  int rc = SQLITE_OK;    /* Return code from subroutines */
  int res = 1;           /* Result of a windows lock call */
  int newLocktype;       /* Set id->locktype to this value before exiting */
  int gotPendingLock = 0;/* True if we acquired a PENDING lock this time */
  winFile *pFile = (winFile*)id;

  assert( pFile!=0 );
  TRACE5("LOCK %d %d was %d(%d)\n",
          pFile->h, locktype, pFile->locktype, pFile->sharedLockByte);

  /* If there is already a lock of this type or more restrictive on the
  ** OsFile, do nothing. Don't use the end_lock: exit path, as
  ** sqlite3OsEnterMutex() hasn't been called yet.
  */
  //當前的鎖>=locktype,則返回
  if( pFile->locktype>=locktype ){
    return SQLITE_OK;
  }

  /* Make sure the locking sequence is correct
  */
  assert( pFile->locktype!=NO_LOCK || locktype==SHARED_LOCK );
  assert( locktype!=PENDING_LOCK );
  assert( locktype!=RESERVED_LOCK || pFile->locktype==SHARED_LOCK );

  /* Lock the PENDING_LOCK byte if we need to acquire a PENDING lock or
  ** a SHARED lock.  If we are acquiring a SHARED lock, the acquisition of
  ** the PENDING_LOCK byte is temporary.
  */
  newLocktype = pFile->locktype;
  /*兩種情況: (1)如果當前文件處於無鎖狀態(獲取讀鎖--讀事務
  **和寫事務在最初階段都要經歷的階段),
  **(2)處於RESERVED_LOCK,且請求的鎖為EXCLUSIVE_LOCK(寫事務)
  **則對執行加PENDING_LOCK
  */
  /////////////////////(1)///////////////////
  if( pFile->locktype==NO_LOCK
   || (locktype==EXCLUSIVE_LOCK && pFile->locktype==RESERVED_LOCK)
  ){
    int cnt = 3;
    //加pending鎖
    while( cnt-->0 && (res = LockFile(pFile->h, PENDING_BYTE, 0, 1, 0))==0 ){
      /* Try 3 times to get the pending lock.  The pending lock might be
      ** held by another reader process who will release it momentarily.
      */
      TRACE2("could not get a PENDING lock. cnt=%d\n", cnt);
      Sleep(1);
    }
    //設置為gotPendingLock為1,使和在后面要釋放PENDING鎖
    gotPendingLock = res;
  }

  /* Acquire a shared lock
  */
  /*獲取shared lock
  **此時,事務應該持有PENDING鎖,而PENDING鎖作為事務從UNLOCKED到
  **SHARED_LOCKED的一個過渡,所以事務由PENDING->SHARED
  **此時,實際上鎖處於兩個狀態:PENDING和SHARED,
  **直到后面釋放PENDING鎖后,才真正處於SHARED狀態
  */
  ////////////////(2)/////////////////////////////////////
  if( locktype==SHARED_LOCK && res ){
    assert( pFile->locktype==NO_LOCK );
    res = getReadLock(pFile);
    if( res ){
      newLocktype = SHARED_LOCK;
    }
  }

  /* Acquire a RESERVED lock
  */
  /*獲取RESERVED
  **此時事務持有SHARED_LOCK,變化過程為SHARED->RESERVED。
  **RESERVED鎖的作用就是為了提高系統的並發性能
  */
  ////////////////////////(3)/////////////////////////////////
  if( locktype==RESERVED_LOCK && res ){
    assert( pFile->locktype==SHARED_LOCK );
    //加RESERVED鎖
    res = LockFile(pFile->h, RESERVED_BYTE, 0, 1, 0);
    if( res ){
      newLocktype = RESERVED_LOCK;
    }
  }

  /* Acquire a PENDING lock
  */
  /*獲取PENDING鎖
  **此時事務持有RESERVED_LOCK,且事務申請EXCLUSIVE_LOCK
  **變化過程為:RESERVED->PENDING。
  **PENDING狀態只是唯一的作用就是防止寫餓死.
  **讀事務不會執行該代碼,但是寫事務會執行該代碼,
  **執行該代碼后gotPendingLock設為0,后面就不會釋放PENDING鎖。
  */
  //////////////////////////////(4)////////////////////////////////
  if( locktype==EXCLUSIVE_LOCK && res ){
    //這里沒有實際的加鎖操作,只是把鎖的狀態改為PENDING狀態
    newLocktype = PENDING_LOCK;
    //設置了gotPendingLock,后面就不會釋放PENDING鎖了,
    //相當於加了PENDING鎖,實際上是在開始處加的PENDING鎖
    gotPendingLock = 0;
  }

  /* Acquire an EXCLUSIVE lock
  */
  /*獲取EXCLUSIVE鎖
  **當一個事務執行該代碼時,它應該滿足以下條件:
  **(1)鎖的狀態為:PENDING (2)是一個寫事務
  **變化過程:PENDING->EXCLUSIVE
  */
  /////////////////////////(5)///////////////////////////////////////////
  if( locktype==EXCLUSIVE_LOCK && res ){
    assert( pFile->locktype>=SHARED_LOCK );
    res = unlockReadLock(pFile);
    TRACE2("unreadlock = %d\n", res);
    res = LockFile(pFile->h, SHARED_FIRST, 0, SHARED_SIZE, 0);
    if( res ){
      newLocktype = EXCLUSIVE_LOCK;
    }else{
      TRACE2("error-code = %d\n", GetLastError());
    }
  }

  /* If we are holding a PENDING lock that ought to be released, then
  ** release it now.
  */
  /*此時事務在第2步中獲得PENDING鎖,它將申請SHARED_LOCK(第3步,和圖形相對照),
  **而在之前它已經獲取了PENDING鎖,
  **所以在這里它需要釋放PENDING鎖,此時鎖的變化為:PENDING->SHARED
  */
  //////////////////////////(6)/////////////////////////////////////
  if( gotPendingLock && locktype==SHARED_LOCK ){
    UnlockFile(pFile->h, PENDING_BYTE, 0, 1, 0);
  }

  /* Update the state of the lock has held in the file descriptor then
  ** return the appropriate result code.
  */
  if( res ){
    rc = SQLITE_OK;
  }else{
    TRACE4("LOCK FAILED %d trying for %d but got %d\n", pFile->h,
           locktype, newLocktype);
    rc = SQLITE_BUSY;
  }
  //在這里設置文件鎖的狀態
  pFile->locktype = newLocktype;
  return rc;
}

  在幾個關鍵的部位標記數字。

(I)對於一個讀事務會的完整經過:
語句序列:(1)——>(2)——>(6)
相應的狀態真正的變化過程為:UNLOCKED→PENDING(1)→PENDING、SHARED(2)→SHARED(6)→UNLOCKED

(II)對於一個寫事務完整經過:
第一階段:
語句序列:(1)——>(2)——>(6)
狀態變化:UNLOCKED→PENDING(1)→PENDING、SHARED(2)→SHARED(6)。此時事務獲得SHARED LOCK。
第二個階段:
語句序列:(3)
此時事務獲得RESERVED LOCK。
第三個階段:
事務執行修改操作。
第四個階段:
語句序列:(1)——>(4)——>(5)
狀態變化為:
RESERVED→ RESERVED 、PENDING(1)→PENDING(4)→EXCLUSIVE(5)。此時事務獲得排斥鎖,就可以進行寫磁盤操作了。

  注:在上面的過程中,由於(1)的執行,使得某些時刻SQLite處於兩種狀態,但它持續的時間很短,從某種程度上來說可以忽略,但是為了把問題說清楚,在這里描述了這一微妙而巧妙的過程。

4、SQLite的死鎖問題
  SQLite的加鎖機制會不會出現死鎖?
  這是一個很有意思的問題,對於任何采取加鎖作為並發控制機制的DBMS都得考慮這個問題。有兩種方式處理死鎖問題:(1)死鎖預防(deadlock prevention)(2)死鎖檢測(deadlock detection)與死鎖恢復(deadlock recovery)。SQLite采取了第一種方式,如果一個事務不能獲取鎖,它會重試有限次(這個重試次數可以由應用程序運行預先設置,默認為1次)——這實際上是基本鎖超時的機制。如果還是不能獲取鎖,SQLite返回SQLITE_BUSY錯誤給應用程序,應用程序此時應該中斷,之后再重試;或者中止當前事務。雖然基於鎖超時的機制簡單,容易實現,但是它的缺點也是明顯的——資源浪費。

5、事務類型(Transaction Types)
  既然SQLite采取了這種機制,所以應用程序得處理SQLITE_BUSY錯誤,先來看一個會產生SQLITE_BUSY錯誤的例子:

    

  所以應用程序應該盡量避免產生死鎖,那么應用程序如何做可以避免死鎖的產生呢?
  答案就是為你的程序選擇正確合適的事務類型。
  SQLite有三種不同的事務類型,這不同於鎖的狀態。事務可以從DEFERRED、IMMEDIATE或者EXCLUSIVE,一個事務的類型在BEGIN命令中指定:

BEGIN [ DEFERRED | IMMEDIATE | EXCLUSIVE ] TRANSACTION;

  一個deferred事務不獲取任何鎖,直到它需要鎖的時候,而且BEGIN語句本身也不會做什么事情——它開始於UNLOCK狀態;默認情況下是這樣的。如果僅僅用BEGIN開始一個事務,那么事務就是DEFERRED的,同時它不會獲取任何鎖,當對數據庫進行第一次讀操作時,它會獲取SHARED LOCK;同樣,當進行第一次寫操作時,它會獲取RESERVED LOCK。
  由BEGIN開始的Immediate事務會試着獲取RESERVED LOCK。如果成功,BEGIN IMMEDIATE保證沒有別的連接可以寫數據庫。但是,別的連接可以對數據庫進行讀操作,但是RESERVED LOCK會阻止其它的連接BEGIN IMMEDIATE或者BEGIN EXCLUSIVE命令,SQLite會返回SQLITE_BUSY錯誤。這時你就可以對數據庫進行修改操作,但是你不能提交,當你COMMIT時,會返回SQLITE_BUSY錯誤,這意味着還有其它的讀事務沒有完成,得等它們執行完后才能提交事務。
  Exclusive事務會試着獲取對數據庫的EXCLUSIVE鎖。這與IMMEDIATE類似,但是一旦成功,EXCLUSIVE事務保證沒有其它的連接,所以就可對數據庫進行讀寫操作了。
  上面那個例子的問題在於兩個連接最終都想寫數據庫,但是他們都沒有放棄各自原來的鎖,最終,shared鎖導致了問題的出現。如果兩個連接都以BEGIN IMMEDIATE開始事務,那么死鎖就不會發生。在這種情況下,在同一時刻只能有一個連接進入BEGIN IMMEDIATE,其它的連接就得等待。BEGIN IMMEDIATE和BEGIN EXCLUSIVE通常被寫事務使用。就像同步機制一樣,它防止了死鎖的產生。
  基本的准則是:如果你在使用的數據庫沒有其它的連接,用BEGIN就足夠了。但是,如果你使用的數據庫在其它的連接也要對數據庫進行寫操作,就得使用BEGIN IMMEDIATE或BEGIN EXCLUSIVE開始你的事務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM