高效的線程安全隊列ConcurrentQueue (上)


ConcurrentQueue<T>隊列是一個高效的線程安全的隊列,是.Net Framework 4.0,System.Collections.Concurrent命名空間下的一個數據結構。

ConcurrentQueue<T>數據結構

下圖是ConcurrentQueue<T>數據結構的示意圖:

clip_image002

ConcurrentQueue<T>隊列由若干Segment動態構成,每個Segment是一塊連續的內存Buffer,大小固定為SEGMENT_SIZE。

ConcurrentQueue<T>私有成員變量

ConcurrentQueue<T>類有三個私有成員變量:

Segment* volatile m_head;

Segment* volatile m_tail;

Segment* volatile m_base;

m_head指向第一個segment,m_tail指向最后一個segment。這兩個指針指向的對象,隨着入隊列和出隊列操作而不斷變化。

m_base指針固定指向ConcurrentQueue<T>實例化的第一個Segment,在析構ConcurrentQueue<T>對象時使用。

ConcurrentQueue<T>成員方法

void Enqueue(T item)
void Enqueue(T item)
{
	DNetSpinWait wait;
	while (!m_tail->TryAppend(item, &m_tail))
	{
		wait.SpinOnce();
	}
}
從m_tail指向的segment中,加入item的值,直到成功加入,函數返回。 

該函數會在分配了新的segment后,更新m_tail指針。

bool TryDequeue(T* result)
bool TryDequeue(T* result)
{

	while (!IsEmpty())
	{
		if (m_head->TryRemove(result, &m_head))
		{
			return true;
		}   
	}
	result = NULL;
	return false;
}

如果當前隊列為空,返回false,否則返回隊列的第一個元素。

bool TryPeek(T* result)
bool TryPeek(T* result)
{
	while (!IsEmpty())
	{
		if (m_head->TryPeek(result))
		{
			return true;
		}
	}
	result = NULL;
	return false;
}

跟TryDequeue()方法相似。

int Count()
int Count() 
{
	Segment* segment;
	Segment* segment2;
	int num;
	int num2;
	GetHeadTailPositions(&segment, &segment2, &num, &num2);
	if (segment == segment2)
	{
		return ((num2 - num) + 1);
	}
	int num3 = SEGMENT_SIZE - num;
	num3 += SEGMENT_SIZE * (((int) (segment2->GetIndex() - segment->GetIndex())) - 1);
	return (num3 + (num2 + 1));
}

通過得到當前首尾的segment指針,以及首指針的m_low索引,以及尾指針的m_high索引,計算當前隊列中元素的個數。

該方法用到了GetHeadTailPositions方法。

bool IsEmpty()
bool IsEmpty()
{
	Segment* head = m_head;
	if (head->IsEmpty())
	{
		if (head->GetNext() == NULL)
		{
			return true;
		}
		DNetSpinWait wait;
		while (head->IsEmpty())
		{
			if (head->GetNext() == NULL)
			{
				return true;
			}
			wait.SpinOnce();
			head = m_head;
		}
	}
	return false;
}

判定當前隊列為空有兩個條件,第一,m_head指向的segment為空;第二,m_head->GetNext()也為空,即m_head和m_tail指向同一個segment。

void Reset()
void Reset()
{
	DeleteNodes();
	m_base = m_head = m_tail = new Segment(0);
}

重置ConcurrentQueue<T>對象,刪除已經分配了的segment,並重新更新成員變量的值。

void GetHeadTailPositions(Segment** head, Segment** tail, int* headLow, int* tailHigh)
void GetHeadTailPositions(Segment** head, Segment** tail, int* headLow, int* tailHigh)
{
	*head = m_head;
	*tail = m_tail;
	*headLow = (*head)->GetLow();
	*tailHigh = (*tail)->GetHigh();
	DNetSpinWait wait;
	while ((((*head != m_head) || (*tail != m_tail)) || 
		((*headLow != (*head)->GetLow()) || (*tailHigh != (*tail)->GetHigh()))) || 
		((*head)->GetIndex() > (*tail)->GetIndex()))
	{
		wait.SpinOnce();
		*head = m_head;
		*tail = m_tail;
		*headLow = (*head)->GetLow();
		*tailHigh = (*tail)->GetHigh();
	}
}

該函數就是將隊列當前的m_head, m_tail指針以及m_head的m_low索引,m_tail的m_high索引取出來,放到線程棧上。並且在取出這些值后,再判斷這些值是否合法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM