Qt同步線程(比較清楚,而且QMutex QMutexLocker QReadWriteLock QSemaphore QWaitCondition 每個都有例子)


Qt同步線程

我們知道,多線程有的時候是很有用的,但是在訪問一些公共的資源或者數據時,需要進行同步,否則會使數據遭到破壞或者獲取的值不正確。Qt提供了一些類來實現線程的同步,如QMutexQMutexLockerQReadWriteLockQReadLockerQWriteLockerQSemaphoreQWaitCondition。下面我們分別來看它們的用法:

QMutex

首先,簡單的了解一下QMutex提供的函數。

構造函數:QMutex ( RecursionMode mode = NonRecursive )。

需要注意的是構造函數的參數,RecursionMode 遞歸模式。枚舉類型RecursionMode 有兩個值:

QMutex::Recursive,在這個模式下,一個線程可以多次鎖同一個互斥量。需要注意的是,調用lock()多少次鎖,就必須相應的調用unlock()一樣次數解鎖。

QMutex::NonRecursive(默認),在這個模式下,一個線程只能鎖互斥量一次。

 

void QMutex::lock ()

該函數用來鎖住一個互斥量。如果另外的線程已經鎖住了互斥量,函數將被阻塞等待另外的線程解鎖互斥量。

如果是一個可遞歸的互斥量,則可以從同一個線程多次調用這個函數,如果是非遞歸的互斥量,多次調用這個函數將會引發死鎖。我們來看看源碼是怎么實現的。

void QMutex::lock()

{

    QMutexPrivate *d = static_cast<QMutexPrivate*>(this->d);

    Qt::HANDLE self;

 

    if(d->recursive) {

        self = QThread::currentThreadId();

        if(d->owner == self) {

            ++d->count;             //同一個線程多次lock時,僅僅自增count

                                   //當然遞歸次數太多也會導致棧溢出

            Q_ASSERT_X(d->count != 0, "QMutex::lock", "Overflowin recursion counter");

            return;

        }

 

        boolisLocked = d->contenders.testAndSetAcquire(0, 1);

        if(!isLocked) {

            // didn'tget the lock, wait for it

            isLocked = d->wait();

            Q_ASSERT_X(isLocked, "QMutex::lock",

                       "Internalerror, infinite wait has timed out.");

        }

 

        d->owner = self;          //遞歸模式時,owner記錄擁有互斥量的線程

        ++d->count;             //記錄lock的次數

        Q_ASSERT_X(d->count != 0, "QMutex::lock", "Overflowin recursion counter");

        return;

    }

    //非遞歸模式時,

    boolisLocked = d->contenders.testAndSetAcquire(0, 1);   //嘗試加鎖

    if(!isLocked) {

        lockInternal();  //加鎖失敗則在lockInternal()中一直等到別的線程解鎖。

    }

}

看看lockInternal的實現

void QMutex::lockInternal()

{

  。。。

    do {

        。。。。//其他代碼太復雜,感覺最重要的就是這個while循環了,

                //一直循環檢測,試圖加鎖。這我們就好理解,非遞歸模式的//互斥量,不要在同一個線程里,多次調用lock了。因為第二次調用的時候會在

//這里死循環了

    } while(d->contenders != 0 || !d->contenders.testAndSetAcquire(0, 1));

 

。。。。。。。

}

 

 

 

bool QMutex::tryLock ()

該函數試圖鎖一個互斥量,如果成功則返回true。如果另外的線程已經鎖住了互斥量,函數直接返回false。

 

bool QMutex::tryLock ( int timeout )

該函數跟上面的trylock()相似。不同的是,如果互斥量在別的線程鎖住的情況下,函數會等待timeout 毫秒。需要注意的是,如果傳入的timeout 為負數,函數將無限期等待,跟調用lock()一樣的效果。這個函數跟上面的差不多,所以只看該函數的源碼實現就好了。

bool QMutex::tryLock(inttimeout)

{

    QMutexPrivate *d = static_cast<QMutexPrivate*>(this->d);

   Qt::HANDLE self;

 

    if(d->recursive) {

        self = QThread::currentThreadId();

        if(d->owner == self) {

            ++d->count;

            Q_ASSERT_X(d->count != 0, "QMutex::tryLock", "Overflow in recursion counter");

            returntrue;

        }

 

        boolisLocked = d->contenders.testAndSetAcquire(0, 1);

        if(!isLocked) {

            // didn'tget the lock, wait for it

            isLocked = d->wait(timeout);    //嘗試加鎖失敗則等待

            if(!isLocked)

                returnfalse;

        }

 

        d->owner = self;

        ++d->count;

        Q_ASSERT_X(d->count != 0, "QMutex::tryLock", "Overflow in recursion counter");

        return true;

    }

//嘗試加鎖失敗,(d->contenders.testAndSetAcquire(0,1)返回false,所以繼續執行d->wait(timeout);

    return (d->contenders.testAndSetAcquire(0, 1) ||d->wait(timeout));

}

//在win下,wait函數實際上是用事件對象實現的

 

bool QMutexPrivate::wait(inttimeout)

{

    if(contenders.fetchAndAddAcquire(1) == 0) {

        // lockacquired without waiting

        return true;

}

// 當timeout 小於0,則等待時間為INFINITE,這也就是為什么傳負數參數時跟lock一樣會無限期等待了

    boolreturnValue = (WaitForSingleObject(event,timeout < 0 ? INFINITE : timeout) ==  WAIT_OBJECT_0);

    contenders.deref();

    returnreturnValue;

}

 

 

void QMutex::unlock ()

該函數對互斥量進行解鎖。如果在另外的線程加鎖,嘗試在別的線程進行解鎖則會引發錯誤。試圖對沒有加鎖的互斥量解鎖結果是未定義的。

 

QMutexLocker

QmutexLocker只是為了簡化我們隊互斥量的加鎖和解鎖操作。就像智能指針方便我們使用普通指針一樣。

QMutexLocker (QMutex * mutex )。

構造函數必須傳入一個互斥量指針,然后在構造函數里mutex直接調用lock()。

inline explicitQMutexLocker(QMutex *m)

    {

        Q_ASSERT_X((reinterpret_cast<quintptr>(m)& quintptr(1u)) == quintptr(0),

                   "QMutexLocker","QMutex pointer is misaligned");

        if (m){

            m->lockInline();    // mutex調用lock()加鎖

            val = reinterpret_cast<quintptr>(m)| quintptr(1u);

        } else{

            val = 0;

        }

}

 

inline ~QMutexLocker() { unlock(); }

inline void unlock()

       {

              if((val & quintptr(1u)) == quintptr(1u)) {

                     val &= ~quintptr(1u);

                     mutex()->unlockInline();   //析構時調用unlock,確保mutex在離開調用線程時被解鎖。

 

              }

       }

下面來看看具體的用法:

假設有個函數有很多return 語句,那么我們就必須記得在每個語句前unlock互斥量,否則互斥量將無法得到解鎖,導致其他等待的線程無法繼續執行。

int complexFunction(intflag)

 {

     mutex.lock();

 

     int retVal = 0;

 

     switch (flag) {

     case 0:

     case1:

         retVal = moreComplexFunction(flag);

         break;

     case 2:

         {

             int status = anotherFunction();

             if (status < 0) {

                 mutex.unlock();

                 return -2;

             }

             retVal = status + flag;

         }

         break;

     default:

         if (flag > 10) {

             mutex.unlock();

             return -1;

         }

         break;

     }

 

     mutex.unlock();

     return retVal;

 }

這樣的代碼顯得很冗余又容易出錯。如果我們用QMutexLocker

intcomplexFunction(int flag)

 {

     QMutexLocker locker(&mutex);

 

     int retVal = 0;

 

     switch (flag) {

     case 0:

     case 1:

         return moreComplexFunction(flag);

     case 2:

         {

             int status = anotherFunction();

             if (status < 0)

                 return -2;

             retVal = status + flag;

         }

         break;

     default:

         if (flag > 10)

             return -1;

         break;

     }

 

     return retVal;

 }

由於locker 是局部變量,在離開函數作用域時,mutex肯定會被解鎖。

 

QreadWriteLock

QreadWriteLock是一個讀寫鎖,主要用來同步保護需要讀寫的資源。當你想多個讀線程可以同時讀取資源,但是只能有一個寫線程操作資源,而其他線程必須等待寫線程完成時,這時候用這個讀寫鎖就很有用了。QreadWriteLock也有遞歸和非遞歸模式之分。

我們主要來看看最重要的兩個函數是如何實現讀寫操作的同步的。

void QReadWriteLock::lockForRead ()

該函數lock接了讀操作的鎖。如果有別的線程已經對lock接了寫操作的鎖,則函數會阻塞等待。

void QReadWriteLock::lockForRead()

{

    QMutexLocker lock(&d->mutex);

 

    Qt::HANDLE self = 0;

    if(d->recursive) {

        self = QThread::currentThreadId();

 

        QHash<Qt::HANDLE, int>::iterator it = d->currentReaders.find(self);

        if (it!= d->currentReaders.end()) {

            ++it.value();

            ++d->accessCount;

            Q_ASSERT_X(d->accessCount >0, "QReadWriteLock::lockForRead()",

                       "Overflowin lock counter");

            return;

        }

    }

// accessCount 小於0說明有寫線程在操作資源,則阻塞

    while(d->accessCount < 0 || d->waitingWriters) {

        ++d->waitingReaders;             //自增等待的讀線程數

       d->readerWait.wait(&d->mutex);

        --d->waitingReaders;

    }

    if(d->recursive)

        d->currentReaders.insert(self, 1);

 

    ++d->accessCount;    //自增,記錄有多少個線程訪問了資源

    Q_ASSERT_X(d->accessCount > 0, "QReadWriteLock::lockForRead()", "Overflow in lock counter");

}

 

 

 

void QReadWriteLock::lockForWrite ()

該函數給lock加了寫操作的鎖,如果別的線程已經加了讀或者寫的鎖,則函數會被阻塞。

 

void QReadWriteLock::lockForWrite()

{

    QMutexLocker lock(&d->mutex);

 

    Qt::HANDLE self = 0;

    if(d->recursive) {

        self = QThread::currentThreadId();

 

        if(d->currentWriter == self) {

            --d->accessCount;

            Q_ASSERT_X(d->accessCount <0, "QReadWriteLock::lockForWrite()",

                       "Overflowin lock counter");

            return;

        }

}

// accessCount不等於0,說明有線程在操作資源,則函數阻塞等待。

// accessCount大於0說明有讀線程在讀取資源,  

// accessCount小於0說明有寫線程在寫數據

 while(d->accessCount != 0) {

        ++d->waitingWriters;        //自增等待的寫線程數

       d->writerWait.wait(&d->mutex);

        --d->waitingWriters;

    }

    if(d->recursive)

        d->currentWriter = self;

 

    --d->accessCount;

    Q_ASSERT_X(d->accessCount < 0, "QReadWriteLock::lockForWrite()", "Overflow in lock counter");

}

 

 

void QReadWriteLock::unlock ()

解鎖函數,下面我們看看源碼是如何實現,讓等待的寫線程優先於讀線程獲得互斥量的鎖的。

void QReadWriteLock::unlock()

{

    QMutexLocker lock(&d->mutex);

 

    Q_ASSERT_X(d->accessCount != 0, "QReadWriteLock::unlock()", "Cannot unlock an unlocked lock");

 

    boolunlocked = false;

    if(d->accessCount > 0) {

        // releasinga read lock

        if(d->recursive) {

            Qt::HANDLE self =QThread::currentThreadId();

            QHash<Qt::HANDLE, int>::iterator it =d->currentReaders.find(self);

            if(it != d->currentReaders.end()) {

                if(--it.value() <= 0)

                   d->currentReaders.erase(it);

            }

        }

        // d->accessCount  說明沒有線程在操作資源了unlocked為true

        unlocked = --d->accessCount == 0;

} else if (d->accessCount < 0 &&++d->accessCount == 0)

{

// d->accessCount <0 說明有寫線程在操作。則解鎖unlocked = true;

        // released awrite lock

        unlocked = true;

        d->currentWriter = 0;

    }

   //最重要的就是這里

    if(unlocked) {

        if(d->waitingWriters) { 

 //如果有寫線程在等待,則wake一個寫線程。前面我們已經知道,寫線程是只

//能有一個對資源進行操作的,所以就wakeone了。

            d->writerWait.wakeOne();

        } else if (d->waitingReaders) {

//如果沒有等待的寫線程,則wake全部的讀線程。因為讀線程是可以多個對資源進行操作的。

            d->readerWait.wakeAll();

        }

    }

}

 

下面是我自己簡單的實現用例:

class Lock:publicQObject

{

       Q_OBJECT

public:

       Lock();

       ~Lock();

 

       voidStart();

 

       voidRead();

       voidWrite();

       voidReadThread1();

       voidReadThread2();

       voidWriteThread1();

       voidWriteThread2();

protected:

private:

       string strResource;

       QReadWriteLock lock;    //非¤?遞ÌY歸¨¦的Ì?

};

 

Lock::Lock()

{

       strResource = "Hellworld ......";

}

 

Lock::~Lock()

{

 

}

 

 

void Lock::Read()

{

       cout<<"Readdata :"<<strResource<<endl;

       QEventLoop loop;

       QTimer::singleShot(2000,&loop,SLOT(quit()));   //為a了¢?使º1得Ì?停ª¡ê留¢?的Ì?時º¡À間?長¡è寫¡ä效¡ì果?好?點Ì?,ê?暫Y停ª¡ê2s

       loop.exec();

}

 

void Lock::Write()

{

       strResource = "writelock ";

       cout<<"Writedata :"<<strResource<<endl;

       QEventLoop loop;

     QTimer::singleShot(2000,&loop,SLOT(quit()));  //為a了¢?使º1得Ì?停ª¡ê留¢?的Ì?時º¡À間?長¡è寫¡ä效¡ì果?好?點Ì?,ê?暫Y停ª¡ê2s

       loop.exec();

}

 

void Lock::ReadThread1()

{

       lock.lockForRead();

       cout<<"ReadThread1  lockForRead"<<endl;

       Read();

       cout<<"ReadThread1  unlock"<<endl;

       lock.unlock();

      

}

 

void Lock::ReadThread2()

{

       lock.lockForRead();

       cout<<"ReadThread2  lockForRead"<<endl;

       Read();

       cout<<"ReadThread2  unlock"<<endl;

       lock.unlock();

      

}

 

void Lock::WriteThread1()

{

       lock.lockForWrite();

       cout<<"WriteThread1  lockForWrite"<<endl;

       Write();

       cout<<"WriteThread1  unlock"<<endl;

       lock.unlock();

      

}

 

void Lock::WriteThread2()

{

       lock.lockForWrite();

       cout<<"WriteThread2  lockForWrite"<<endl;

       Write();

       cout<<"WriteThread2  unlock"<<endl;

       lock.unlock();

      

}

 

void Lock::Start()

{

       QtConcurrent::run(this,&Lock::ReadThread1);

       QtConcurrent::run(this,&Lock::ReadThread2);

 

       QtConcurrent::run(this,&Lock::WriteThread1);

       QtConcurrent::run(this,&Lock::WriteThread2);

}

這里我先啟動兩個讀線程,再啟動寫線程,運行結果如下。我們發現先讀線程1先加了鎖,讀線程1還沒解鎖的時候,讀線程2已經加了鎖,驗證了讀線程是可以同時進入的。

 

 

如果我改一下代碼:

void Lock::Start()

{

       QtConcurrent::run(this,&Lock::WriteThread1);

       QtConcurrent::run(this,&Lock::ReadThread1);

       QtConcurrent::run(this,&Lock::ReadThread2);

       QtConcurrent::run(this,&Lock::WriteThread2);

}

我先啟動WriteThread1,然后啟動兩個讀線程,最后啟動WriteThread2。運行結果如下,我們發現,WriteThread1運行完之后,先運行WriteThread2,最后才是兩個讀線程。驗證了寫線程比讀線程先獲得鎖。

 

 

 

QSemaphore

QSemaphore是提供一個計數的信號量。信號量是泛化的互斥量。一個信號量只能鎖一次,但是我們可以多次獲得信號量。信號量可以用來同步保護一定數量的資源。

信號量支持兩個基本是函數, acquire()和 release():

acquire(n) :嘗試獲取n個資源。如果沒有足夠的可用資源,該函數調用會被則是。

release(n) :釋放n個資源。

 

它們的源碼實現也很簡單:

void QSemaphore::acquire(intn)

{

    Q_ASSERT_X(n >= 0, "QSemaphore::acquire", "parameter 'n' must be non-negative");

    QMutexLocker locker(&d->mutex);

    while (n> d->avail)  //申請的資源n 大於可用資源avail則進入等待。

        d->cond.wait(locker.mutex()); 

    d->avail -= n;

}

 

void QSemaphore::release(intn)

{

    Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative");

    QMutexLocker locker(&d->mutex);

    d->avail += n;

    d->cond.wakeAll();

}

 

由於avail變量,實際就是一個int的計數變量 。所以我們在調用release()傳入的參數n大於信號量初始值也沒關系,只是說明可用資源增加了。

例如以下代碼:

int main(int argc, char *argv[])

{

       QCoreApplication a(argc, argv);

       QSemaphore sem(5);

      

       sem.acquire(5);        

       cout<<"acquire(5);  "<<"remaindresource :"<<sem.available()<<endl;

       sem.release(5);

       cout<<"release(5)  "<<"remaindresource :"<<sem.available()<<endl;

       sem.release(10);

       cout<<"release(10)  "<<"remaindresource :"<<sem.available()<<endl;

       sem.acquire(15);

       cout<<"acquire(15);  "<<"remaindresource :"<<sem.available()<<endl;

       returna.exec();

}

 

 

信號量最著名的就是生產者與消費者的例子,以后再研究了。

QWaitCondition 

QWaitCondition類提供了一個條件變量,它允許我們通知其他線程,等待的某些條件已經滿足。等待QWaitCondition變量的可以是一個或多個線程。當我們用wakeOne()通知其他線程時,系統會隨機的選中一個等待進行喚醒,讓它繼續運行。其實前面的信號量和讀寫鎖內部實現都有用到QWaitCondition的。

下面我們來看這個類重要的幾個函數:

 

ool QWaitCondition::wait ( QMutex * mutex, unsigned long time =ULONG_MAX )

該函數對mutex解鎖,然后等待。在調用這個函數之前,mutex必須是加鎖狀態。如果mutex沒有加鎖,則函數直接返回。如果mutex是可遞歸的,函數也直接返回。該函數對mutex解鎖,然后等待,知道以下條件之一滿足:

1.     另外的線程調用wakeOne()或 wakeAll(),則該函數會返回true。

2.     時間過了Time毫秒。如果time為ULONG_MAX(默認),則將會一直等待不會超時。如果超時則返回false。

 

bool QWaitCondition::wait ( QReadWriteLock * readWriteLock, unsigned long time =ULONG_MAX )

函數對readWriteLock解鎖並等待條件變量。在調用這個函數之前,readWriteLock必須是加鎖狀態的。如果不是加鎖狀態,則函數立即返回。readWriteLock必須不能是遞歸加鎖的,否則將不能正確的解鎖。返回的滿足條件跟上面的函數一樣。

http://blog.csdn.net/hai200501019/article/details/9889123


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM