多線程應用中,由於多個線程的存在,線程之間可能需要訪問同一個變量,或者一個線程可能需要等待另外一個線程完成某個操作后才產生相應的動作。
如:線程中計算量大的幾條代碼段,執行較長時間,不希望在執行過程中被其他線程打斷,需要保護起來,這就是線程同步的概念。
Qt中,有多個類可以實現線程同步的功能,包括QMutex、QMutexLocker、QReadWriterLock、QReadLocker、QWriterLocker、QWaitCondition、QSemaphore。
還提供了QtConcurrent高級API實現多線程編程,同時實現多線程程序可以自動根據處理器內核個數調整線程個數。
1.QMutex和QMutexLocker 是基於互斥量的線程同步類。QMutex定義的實例是一個互斥量
QMutex主要提供3個函數:
1)lock():鎖定互斥量,如果另一個線程鎖定該互斥量,它將阻塞執行直到其他線程解鎖這個互斥量
2)unlock():解鎖互斥量
3)tryLock():試圖鎖定一個互斥量,如果成功就返回true,如果已被其他線程鎖定,返回false,但不會阻塞程序執行
//mythread類中:
private:
QMutex mutex // 信號量
將原函數中 主線程通過信號槽得到線程處理數據(線程處理 emit(value))改為
主線程不斷查詢讀取新數據
1 run() 2 3 { 4 5 //// 6 7 if(xxx){ 8 9 mutex.lock(); 10 11 //數據處理 12 13 mutex.unlock(); 14 15 } 16 17 } 18 19 bool xxx::readValue() 20 21 { 22 23 if(mutex.tryLock()) 24 25 { 26 27 //xxxxxxxxxxxxxxxxxxx 28 29 mutex.unlock(); 30 31 return true; 32 33 } 34 35 else 36 37 return false 38 39 }
2. QMutexLocker 是簡化互斥量處理的類。QMutexLocker構造函數接收一個互斥量作為參數並將其鎖定,QMutexLocker的析構函數則將此互斥量解鎖。在QMutexLocker實例變量生存期內的代碼段得到保護,自動進入互斥量的鎖定和解鎖。
上文例子改為:
1 run() 2 3 { 4 5 //// 6 7 if(xxx){ 8 9 QMutexLocker Locker(&mutex); // mutex.lock(); 10 11 //數據處理 12 13 // mutex.unlock(); 14 15 } 16 17 } 18 19
3. QReadWriteLock
每次只有一個線程獲得互斥量的使用權限,一個程序中如果有多個線程讀取某個變量,使用互斥量時候也必須排隊。若只是讀取一個變量,多線程可以同時訪問,否則互斥量就會降低程序性能。
QReadWriteLock以讀或者寫鎖定的同步方法允許以讀或寫的方式保護一段代碼,可以允許多個線程以只讀方式同步訪問資源,但是只要有一個線程以寫方式訪問資源,其他線程就必須等待直到寫操作結束。
提供以下幾個函數:
lockForRead(): 以只讀方式鎖定資源,如果有其他線程以寫入方式鎖定,這個函數會阻塞
lockForWrite(): 以寫入方式鎖定資源,如果本線程和其他線程以讀或寫模式鎖定資源,這個函數會阻塞
unlock():解鎖
tryLockForRead():lockForRead非阻塞版本
tryLockForWrite():lockForWrite非阻塞版本
例如:
1 QReadWriteLock Lock; 2 3 threadA : run() 4 5 { 6 7 Lock.lockForWrite() //B和C同時被阻塞 8 9 //寫數據 10 11 Lock.unlock() 12 13 } 14 15 16 17 threadB: run() 18 19 { 20 21 Lock.lockForRead() //A無法以寫入模式鎖定,被阻塞 22 23 //讀數據 24 25 Lock.unlock() 26 27 } 28 29 threadC : run() 30 31 { 32 33 Lock.lockForRead() 34 35 //讀數據 36 37 Lock.unlock() 38 39 }
4. QReadLocker 和 QWriteLocker是 QReadWriteLock的簡便形式 類似QMutexLocker是QMutex的簡便版本一樣。
1 QReadWriteLock Lock; 2 3 threadA : run() 4 5 { 6 7 QWriteLocer Locker(&Lock) //B和C同時被阻塞 8 9 //寫數據 10 11 12 13 } 14 15 16 17 threadB: run() 18 19 { 20 21 QReadLocer Locker(&Lock) //A無法以寫入模式鎖定,被阻塞 22 23 //讀數據 24 25 26 27 } 28 29 threadC : run() 30 31 { 32 33 QReadLocer Locker(&Lock) 34 35 //讀數據 36 37 38 39 }
5.QWaitCondition 一般用於生產者/消費者(producer / consumer)模型中:生產者產生數據,消費者使用數據
多線程程序中,多個線程的同步實際就是他們之間的協調問題。
以上方案在一個線程解鎖資源以后,不能及時通知其他線程
QWaitCondition提供了另外一種改進的線程同步方法:
QWaitCondition和QMutex結合,可以使得一個線程在滿足一定條件時候通知其他多個線程,是他們及時作出相應,效率比只使用互斥量要高
提供以下函數:
1)wait(QMutex *lockedMutex)解鎖互斥量lockedMutex,並阻塞等待喚醒條件,被喚醒后鎖定lockedMutex並退出函數
2)wakeAll() 喚醒所有處於等待狀態的線程,線程喚醒的順序不確定,有操作系統的調度策略決定
3)wakeOne()喚醒一個處於等待狀態的線程,喚醒哪個不確定,由操作系統的調度策略決定
1 QMutex mutex 2 3 QWaitConditon newdataAvailable 4 5 threadA::run() 6 7 { 8 9 mutex,lock() 10 11 //獲取數據 12 13 mutex.unlock() 14 15 newdataAvaliable.wakeAll() //喚醒所有線程由新數據了 16 17 } 18 19 20 21 threadB::run() 22 23 { 24 25 mutex.lock() 26 27 newdataAvailable.wat(&mutex) //先解鎖mutex 使得其他線程可以使用mutex 28 29 emit newValue() //讀取數據,發射信號的方式把數據傳遞出去 30 31 mutex.unlock() 32 33 } 34 35 36 37 注意:threadConsumer.start() //應該先啟動,先進入wait狀態 38 39 threadProducer.start() // 應該后啟動,內部wakeAll時候,threadConsumer可以及時響應,否則會丟失數據 40 41 42 43 threadProducer.stopThread() 44 45 threadProducer.wait() 46 47 threadConsumer.terminate() //可能處於等待狀態 用terminate強制結束 48 49 threadConsumer,wait() 50 51
7 Semaphore 基於信號量的線程同步
Semaphore是另一種限制對共享資源進行訪問的線程同步機制,與Mutex相似但是有區別
一個Mutex只能被鎖定一次,信號量可以多次使用。信號量通常用來保護一定數量的相同的資源,如數據采集時的雙緩沖區。
提供以下函數:
1)acquire(int n) 嘗試獲取n個資源,如果沒有這么多資源,程序將阻塞直到有這么多資源
2)release(int n)釋放n個資源,如果信號量的資源已經全部可用之后再release,就可以創建更多的資源,增加可用資源的個數
3)int available()返回當前信號量可用的資環個數,這個數永遠不為負,==0 ,說明當前沒有可用資源
4)bool tryAcquire(int n=1)嘗試獲取n個資源,不成功不阻塞線程
QSemaphore WC(5) //WC.available() = 5
WC.acquire(4) //WC.available() = 1
WC.release(2) //WC.available() = 3
WC.acquire(3) //WC.available() = 0
WC.tryAcquire(1) //WC.available() = 0 返回 false
WC.acquire() //WC.available() = 0 沒有可用資源 阻塞
n個資源就是信號量需要保護的共享資源,至於資源如何分配,就是內部處理的問題
例:
雙緩沖區模擬數據采集卡
1 //共享變量區域: 2 3 const int BufferSize = BUF_SIZE 4 5 int buf1 [BufferSize] 6 7 int buf2 [BufferSize] 8 9 int CurBuf = 1 // 當前正在寫入的Buffer 10 11 int bufNo = 0 //采集的數據區序號 12 13 14 15 QSemaphore emptyBufs[2] //信號量 空的緩沖區個數 初始資源個數為2 16 17 QSemaphore fullBufs //信號量 滿的緩沖區個數 初始資源個數為0 18 19 20 21 22 23 QThreadDA::run() //數據采集線程 24 25 { 26 27 m_stop = false ; //啟動線程時 m_stop = false 28 29 bufNo = 0 ; //緩沖區序號 30 31 curBuf = 1 //當前寫入使用的緩沖區 32 33 int n = emptyBufs.available(); 34 35 if(n < 2) 36 37 emptyBufs.release(2-n) //保證線程啟動的時候empty.avaliable == 2 38 39 while( ! m_stop ) 40 41 { 42 43 emtpyBufs.acquire(); //獲取一個空緩沖區 44 45 數據產生中 for(i=0;i < BufferSize ;i++): 46 47 { if(curBuf == 1) 48 49 buf1【i】 = 新采集數據數據 50 51 else 52 53 buf2【i】 =新采集數據數據 54 55 } 56 57 bufNo++; 58 59 if(curBuf == 1) 60 61 curBuf = 2 62 63 else 64 65 curBuf = 1 66 67 fullBufs.releasse() //有了一個滿的緩沖區 available = 1 68 69 } 70 71 72 73 } 74 75 76 77 //實際情況下,讀取現成的操作應該比采集線程的速度要快 78 79 QThreadShow::run() 80 81 { 82 83 m_stop = false 84 85 int n = fullBufs.available() 86 87 if( n > 0 ) 88 89 fullBufs.acquire(n) // 將fullBuf可用資源個數初始化為0 90 91 while(!m_stop) 92 93 { 94 95 fullBufs.acquire() //等待緩沖區滿 當fullBufs.available = 0 時阻塞 96 97 int data【BufSize】 98 99 int seq = bufNo 100 101 102 103 if(curBuf == 1) //當前在寫入的緩沖區是1 那么滿的緩沖區是2 104 105 for(。。) 106 107 data【i】 = buf2【i】 108 109 else 110 111 112 113 for(。。) 114 115 data【i】 = buf1【i】 116 117 118 119 emptyBufs.release() //釋放一個空緩沖區 120 121 emit newview(data,datasize,seq); 122 123 } 124 125 126 127 }