上篇文章嘗試着使用head lock和tail lock分別在Get和Add元素時,對隊列進行上鎖,這樣就避免了每次操作都鎖住整個隊列,縮小了鎖的粒度。這里還有個問題,隊列中持有的T對象指針,均是由調用者動態分配和釋放的,如果調用量特別大,new/delete操作頻繁,同樣會導致性能下降,可能使系統產生大量的內存碎片。對於這個問題,我最開始想到的是讓隊列中不持有原生指針,而是使用帶引用計數的智能指針,但后來想想,這樣只可能避免內存泄露和賦值拷貝時大量內存復制的情況,而隊列中元素只有存取兩種行為,要解決大量的內存分配和釋放操作,這樣的做法顯然不能對性能帶來大的提高。
那么如何才能避免大量的內存分配和釋放呢,仔細思考,想到兩種方式:
- 使用內存池。我們知道,內存池的原理是,“內存分配調用預先一次性申請適當大小的內存作為一個內存池,之后應用程序自己對內存的分配和釋放則可以通過這個內存池來完成。只有當內存池大小需要動態擴展時,才需要再調用系統的內存分配函數,其他時間對內存的一切操作都在應用程序的掌控之中。” 我們可以將自定義的內存池,加入T類的聲明中,對T類的operator new和operator delete進行重載,實際使用內存池的分配和釋放函數,這樣對象分配的內存要比缺省operator new更少,而且運行得更快。需要注意的是,在多線程環境中,內存池有可能被多個線程共享,因此需要在每次分配和釋放內存時加鎖,而這樣一來,元素構造析構時需要加鎖,在隊列中進行Add和Get同樣需要加鎖,是否能夠使性能提高還需要實際的測試來驗證。
- 如果我們不使用動態分配的方式,讓隊列保存棧上的對象,而不是堆中的指針,這樣一來不用管理指針的生命周期,二來不用進行大量的new/delete調用。那么問題是當元素的個數是未知數是,我們可以動態的分配,而如果要將元素對象像數組元素一樣保存在隊列中,我們無法確定這個隊列中這個數組容器的大小,這就需要事先明確了緩沖區的最大容量的情形,可以使用循環隊列來處理。
這里我們主要討論下循環隊列在生產者消費者模式中的應用。循環隊列是一段固定大小的連續內存空間,它保存的元素不需要進行動態的內存釋放和分配,使用固定大小的內存空間反復使用,當一個數據元素被用掉后,其余數據元素不需要移動其存儲位置。
從圖中的循環隊列可以看出,兩個指針head和tail來分別表示讀和寫的位置,開始時隊列為空,head和tail指向第一個元素。向隊列中Add元素時,head指針向后移動,從隊列中Get元素時,tail指針向后移動。當Add操作頻率遠大於Get時,head指針追趕上tail指針,說明隊列中元素已滿,需要等待Get操作,而如果Get操作頻率遠大於Add時,tail元素會追趕上head元素,說明隊列已經空了,需要等待Add操作。這里有兩個阻塞動作,那么我們需要用兩個條件變量分別控制隊列已滿還是隊列為空,讓線程等待。
同時注意如果head和tail指針相等,指向同一個位置時,既可以表示隊列為空,也可以表示隊列已滿,那么如何區分是隊滿還是隊空。我們可以少用一個元素的空間,約定入隊前,測試tail指針在循環意義下加1后是否等於head指針,若相等則認為隊滿。這意味着緩沖區中總是有一個存儲單元保持未使用狀態。緩沖區最多存入size-1個數據。我們對之前的BlockQueue進行修改,來實現循環隊列:
1 #define MAXSIZE 1024
2
3 template<class T>
4 class CircleQueue 5 { 6 public: 7 CircleQueue(unsigned int size = MAXSIZE); 8 ~CircleQueue(); 9
10 bool Add(const T &cData, int timeout = 0); 11 bool Get(T &cData, int timeout = 0); 12 bool IsFull(); 13 bool IsEmpty(); 14 private: 15 T *buffer; 16 int head, tail; 17 unsigned int maxsize; 18 int m_nBlockOnAdd; //阻塞在Add操作中的線程個數
19 int m_nBlockOnGet; //阻塞在Get操作中的線程個數
20 CMutex m_cLock; 21 CCond m_addCond; //隊列滿時,Add操作阻塞
22 CCond m_getCond; //隊列空時,Get操作阻塞
23 }; 24
25 template<class T>
26 CircleQueue<class T>::CircleQueue(unsigned int size):head(0), tail(0), maxsize(size), m_nBlockOnAdd(0), m_nBlockOnGet(0) 27 { 28 buffer = new T[maxsize]; 29 } 30
31 template<class T>
32 CircleQueue<class T>::~CircleQueue() 33 { 34 if(buffer != NULL) 35 delete[] buffer; 36 buffer == NULL; 37 head = tail = 0; 38 m_nBlockOnAdd = m_nBlockOnGet = 0; 39 } 40
41 template<class T>
42 bool CircleQueue<class T>::IsFull() 43 { 44 return (tail+1) % maxsize == head; 45 } 46
47 template<class T>
48 bool CircleQueue<class T>::IsEmpty() 49 { 50 return head == tail; 51 } 52
53 template<class T>
54 bool CircleQueue<class T>::Add(const T &cData, int timeout) 55 { 56 m_cLock.EnterMutex(); 57 while(isFull()) 58 { 59 m_nBlockOnAdd++; 60 if(m_addCond.WaitLock(m_cLock.GetMutex(), timeout) == 1) 61 { 62 m_cLock.LeaveMutex(); 63 m_nBlockOnAdd--; 64 return false; 65 } 66 m_nBlockOnAdd--; 67 } 68
69 buffer[tail] = cData; 70 tail = (tail+1) % maxsize; 71
72 if(m_nBlockOnGet > 0) 73 m_getQueue.Signal(); 74
75 m_cLock.LeaveMutex(); 76
77 return true; 78 } 79
80 template<class T>
81 void CircleQueue<class T>::Get(T &cData, int timeout) 82 { 83 m_cLock.EnterMutex(); 84 while (isEmpty()) 85 { 86 m_nBlockOnGet++; 87 if (m_getCond.WaitLock(m_cLock.GetMutex(), timeout) == 1) 88 { 89 m_cLock.LeaveMutex(); 90 m_nBlockOnGet--; 91 return false; 92 } 93 m_nBlockOnGet--; 94 } 95
96 cData = buffer[head]; 97 head = (head+1) % maxsize; 98
99 if(m_nBlockOnAdd > 0) 100 m_addCond.Signal(); 101
102 m_cLock.LeaveMutex(); 103
104 return true; 105 }
對上面的代碼進行分析,可以看出:
- 代碼中這個阻塞循環隊列,在構造函數中用new操作符新建了一塊連續的內存,而用於存儲的T類型需要提供默認構造函數。
- Add和Get函數都加上超時限制,布爾類型的返回值表示函數是否調用成功,而函數的出參和入參為引用方式,避免了不必要的數據拷貝。
- m_nBlockOnAdd和m_nBlockOnGet分別表示阻塞在Add函數和Get函數的線程個數,通過這兩個值,可以高效的對阻塞線程進行喚醒操作。