DirectX11 With Windows SDK--27 計算着色器:雙調排序


前言

上一章我們用一個比較簡單的例子來嘗試使用計算着色器,但是在看這一章內容之前,你還需要了解下面的內容:

章節
26 計算着色器:入門
深入理解與使用緩沖區資源(結構化緩沖區/有類型緩沖區)
Visual Studio圖形調試器詳細使用教程(編程捕獲部分)

這一章我們繼續用一個計算着色器的應用實例作為切入點,進一步了解相關知識。

DirectX11 With Windows SDK完整目錄

Github項目源碼

歡迎加入QQ群: 727623616 可以一起探討DX11,以及有什么問題也可以在這里匯報。

線程標識符與線程尋址機制

對於線程組(大小(ThreadDimX, ThreadDimY, ThreadDimZ))中的每一個線程,它們都有一個唯一的線程ID值。我們可以使用系統值SV_GroupThreadID來取得,它的索引范圍為(0, 0, 0)(ThreadDimX - 1, ThreadDimY - 1, ThreadDimZ - 1)

而對於整個線程組來說,由於線程組集合也是在3D空間中排布,它們也有一個唯一的線程組ID值。我們可以使用系統值SV_GroupID來取得,線程組的索引范圍取決於調用ID3D11DeviceContext::Dispatch時提供的線程組(大小(GroupDimX, GroupDimY, GroupDimZ)),范圍為(0, 0, 0)(GroupDimX - 1, GroupDimY - 1, GroupDimZ - 1)

緊接着就是系統值SV_GroupIndex,它是單個線程組內的線程三維索引的一維展開。若已知線程組的大小為(ThreadDimX, ThreadDimY, ThreadDimZ),則可以確定SV_GroupIndexSV_GroupThreadID滿足下面關系:

SV_GroupIndex = SV_GroupThreadID.z * ThreadDimX * ThreadDimY + SV_GroupThreadID.y * ThreadDimX + SV_GroupThreadID.x;

最后就是系統值SV_DispatchThreadID,線程組中的每一個線程在ID3D11DeviceContext::Dispatch提供的線程組集合中都有其唯一的線程ID值。若已知線程組的大小為 (ThreadDimX, ThreadDimY, ThreadDimZ),則可以確定SV_DispatchThreadIDSV_GroupThreadIDSV_GroupID滿足以下關系:

SV_DispatchThreadID.xyz = SV_GroupID.xyz * float3(ThreadDimX, ThreadDimY, ThreadDimZ) + SV_GroupThreadID.xyz;

共享內存和線程同步

在一個線程組內,允許設置一片共享內存區域,使得當前線程組內的所有線程都可以訪問當前的共享內存。一旦設置,那么每個線程都會各自配備一份共享內存。共享內存的訪問速度非常快,就像寄存器訪問CPU緩存那樣。

共享內存的聲明方式如下:

groupshared float4 g_Cache[256];

對於每個線程組來說,它所允許分配的總空間最大為32kb(即8192個標量,或2048個向量)。內部線程通常應該使用SV_ThreadGroupID來寫入共享內存,這樣以保證每個線程不會出現重復寫入操作,而讀取共享內存一般是線程安全的。

分配太多的共享內存會導致性能問題。假如一個多處理器支持32kb的共享內存,然后你的計算着色器需要20kb的共享內存,這意味着一個多處理器只適合處理一個線程組,因為剩余的共享內存不足以給新的線程組運行,這也會限制GPU的並行運算,當該線程組因為某些原因需要等待,會導致當前的多處理器處於閑置狀態。因此保證一個多處理器至少能夠處理兩個或以上的線程組(比如每個線程組分配16kb以下的共享內存),以盡可能減少該多處理器的閑置時間。

現在來考慮下面的代碼:

Texture2D g_Input : register(t0);
RWTexture2D<float4> g_Output : register(u0);

groupshared float4 g_Cache[256];

[numthreads(256, 1, 1)]
void CS(uint3 GTid : SV_GroupThreadID,
	uint3 DTid : SV_DispatchThreadID)
{
	// 將紋理像素值緩存到共享內存
	g_Cache[GTid.x] = g_Input[DTid.xy];
	
	// 取出共享內存的值進行計算
	
	// 注意!!相鄰的兩個線程可能沒有完成對紋理的采樣
	// 以及存儲到共享內存的操作
	float left = g_Cache[GTid.x - 1];
	float right = g_Cache[GTid.x + 1];
	
	// ...
}

因為多個線程同時運行,同一時間各個線程當前執行的指令有所偏差,有的線程可能已經完成了共享內存的賦值操作,有的線程可能還在進行紋理采樣操作。如果當前線程正在讀取相鄰的共享內存片段,結果將是未定義的。為了解決這個問題,我們必須在讀取共享內存之前讓當前線程等待線程組內其它的所有線程完成寫入操作。這里我們可以使用GroupMemoryBarrierWithGroupSync函數:

Texture2D g_Input : register(t0);
RWTexture2D<float4> g_Output : register(u0);

groupshared float4 g_Cache[256];

[numthreads(256, 1, 1)]
void CS(uint3 GTid : SV_GroupThreadID,
	uint3 DTid : SV_DispatchThreadID)
{
	// 將紋理像素值緩存到共享內存
	g_Cache[GTid.x] = g_Input[DTid.xy];
	
	// 等待所有線程完成寫入
	GroupMemoryBarrierWithGroupSync();
	
	// 現在讀取操作是線程安全的,可以開始進行計算
	float left = g_Cache[GTid.x - 1];
	float right = g_Cache[GTid.x + 1];
	
	// ...
}

雙調排序

雙調序列

所謂雙調序列(Bitonic Sequence),是指由一個非嚴格遞增序列X(允許相鄰兩個數相等)和非嚴格遞減序列Y構成的序列,比如序列\((5, 3, 2, 1, 4, 6, 6, 12)\)

定義:一個序列\(a_1 , a_2, ..., a_n\)是雙調序列,需要滿足下面條件:

  1. 存在一個\(a_k(1 <= k <= n)\),使得\(a_1 >= ... >= a_k <= ... <= a_n\)成立,或者\(a_1 <= ... <= a_k >= ... >= a_n\)成立;
  2. 序列循環移位后仍能夠滿足條件(1)

Batcher歸並網絡

Batcher歸並網絡是由一系列Batcher比較器組成的,Batcher比較器是指在兩個輸入端給定輸入值x和y,再在兩個輸出端輸出最大值\(max(x, y)\)和最小值\(min(x, y)\)

雙調歸並網絡

雙調歸並網絡是基於Batch定理而構建的。該定理是說將任意一個長為2n的雙調序列分為等長的兩半X和Y,將X中的元素與Y中的元素按原序比較,即\(a_i\)\(a_{i+n}(i <= n)\)比較,將較大者放入MAX序列,較小者放入MIN序列。則得到的MAX序列和MIN序列仍然是雙調序列,並且MAX序列中的任意一個元素不小於MIN序列中的任意一個元素。

根據這個原理,我們可以將一個n元素的雙調序列通過上述方式進行比較操作來得到一個MAX序列和一個MIN序列,然后對這兩個序列進行遞歸處理,直到序列不可再分割為止。最終歸並得到的為一個有序序列。

這里我們用一張圖來描述雙調排序的全過程:

其中箭頭方向指的是兩個數交換后,箭頭段的數為較大值,圓點段的數為較小值。

我們可以總結出如下規律:

  1. 每一趟排序結束會產生連續的雙調序列,除了最后一趟排序會產生我們所需要的單調序列
  2. 對於2^k個元素的任意序列,需要進行k趟排序才能產生單調序列
  3. 對於由\(2^{k-1}\)個元素的單調遞增序列和\(2^{k-1}\)個元素的單調遞減序列組成的雙調序列,需要進行k趟交換才能產生2^k個元素的單調遞增序列
  4. 在第n趟排序中的第m趟交換,若兩個比較數中較小的索引值為i,那么與之進行交換的數索引為\(i+2^{n-m}\)

雙調排序的空間復雜度為\(O(n)\),時間復雜度為\(O(n{(lg(n))}^2)\),看起來比\(O(nlg(n))\)系列的排序算法慢上一截,但是得益於GPU的並行計算,可以看作同一時間內有n個線程在運行,使得最終的時間復雜度可以降為\(O({(lg(n))}^2)\),效率又上了一個檔次。

需要注意的是,雙調排序要求排序元素的數目為\(2^k, (k>=1)\),如果元素個數為\(2^k < n < 2^{k+1}\),則需要填充數據到\(2^{k+1}\)個。若需要進行升序排序,則需要填充足夠的最大值;若需要進行降序排序,則需要填充足夠的最小值。

排序核心代碼實現

本HLSL實現參考了directx-sdk-samples,雖然里面的實現看起來比較簡潔,但是理解它的算法實現費了我不少的時間。個人以自己能夠理解的形式對它的實現進行了修改,因此這里以我這邊的實現版本來講解。

首先是排序需要用到的資源和常量緩沖區,定義在BitonicSort.hlsli

// BitonicSort.hlsli
Buffer<uint> g_Input : register(t0);
RWBuffer<uint> g_Data : register(u0);

cbuffer CB : register(b0)
{
    uint g_Level;        // 2^需要排序趟數
    uint g_DescendMask;  // 下降序列掩碼
    uint g_MatrixWidth;  // 矩陣寬度(要求寬度>=高度且都為2的倍數)
    uint g_MatrixHeight; // 矩陣高度
}

然后是核心的排序算法:

// BitonicSort_CS.hlsl
#include "BitonicSort.hlsli"

#define BITONIC_BLOCK_SIZE 512

groupshared uint shared_data[BITONIC_BLOCK_SIZE];

[numthreads(BITONIC_BLOCK_SIZE, 1, 1)]
void CS(uint3 Gid : SV_GroupID,
    uint3 DTid : SV_DispatchThreadID,
    uint3 GTid : SV_GroupThreadID,
    uint GI : SV_GroupIndex)
{
    // 寫入共享數據
    shared_data[GI] = g_Data[DTid.x];
    GroupMemoryBarrierWithGroupSync();
    
    // 進行排序
    for (uint j = g_Level >> 1; j > 0; j >>= 1)
    {
        uint smallerIndex = GI & ~j;
        uint largerIndex = GI | j;
        bool isDescending = (bool) (g_DescendMask & DTid.x);
        bool isSmallerIndex = (GI == smallerIndex);
        uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ?
            shared_data[largerIndex] : shared_data[smallerIndex];
        GroupMemoryBarrierWithGroupSync();

        shared_data[GI] = result;
        GroupMemoryBarrierWithGroupSync();
    }
    
    // 保存結果
    g_Data[DTid.x] = shared_data[GI];
}

可以看到,我們實際上可以將遞歸過程轉化成迭代來實現。

現在我們先從核心排序算法講起,由於受到線程組的線程數目、共享內存大小限制,這里定義一個線程組包含512個線程,即一個線程組最大允許排序的元素數目為512。共享內存在這是用於臨時緩存中間排序的結果。

首先,我們需要將數據寫入共享內存中:

// 寫入共享數據
shared_data[GI] = g_Data[DTid.x];
GroupMemoryBarrierWithGroupSync();

接着就是要開始遞歸排序的過程,其中g_Level的含義為單個雙調序列的長度,它也說明了需要對該序列進行\(lg(g_Level)\)趟遞歸交換。

在一個線程中,我們僅知道該線程對應的元素,但現在我們還需要做兩件事情:

  1. 找到需要與該線程對應元素進行Batcher比較的另一個元素
  2. 判斷當前線程對應元素與另一個待比較元素相比,是較小索引還是較大索引

這里用到了位運算的魔法。先舉個例子,當前j為4,則待比較兩個元素的索引分別為2和6,這兩個索引值的區別在於索引2(二進制010)和索引6(二進制110),前者二進制第三位為0,后者二進制第三位為1.

但只要我們知道上述其中的一個索引,就可以求出另一個索引。較小索引值的索引可以通過屏蔽二進制的第三位得到,而較大索引值的索引可以通過按位或運算使得第三位為1來得到:

uint smallerIndex = GI & ~j;
uint largerIndex = GI | j;
bool isSmallerIndex = (GI == smallerIndex);

然后就是判斷當前元素是位於當前趟排序完成后的遞增序列還是遞減序列,比如序列\((4, 6, 4, 3, 5, 7, 2, 1)\),現在要進行第二趟排序,那么前后4個數將分別生成遞增序列和遞減序列,我們可以設置g_DescendMask的值為4(二進制100),這樣二進制索引范圍在100到111的值(對應十進制4-7)處在遞減序列,如果這個雙調序列長度為16,那么索引4-7和12-15的兩段序列都可以通過g_DescendMask來判斷出處在遞減序列:

bool isDescending = (bool) (g_DescendMask & DTid.x);

最后就是要確定當前線程對應的共享內存元素需要得到較小值,還是較大值了。這里又以一個雙調序列\((2, 5, 7, 4)\)為例,待比較的兩個元素為5和4,當前趟排序會將它變為單調遞增序列,即所處的序列為遞增序列,當前線程對應的元素為5,shared_data[smallerIndex] <= shared_data[largerIndex]的比較結果為>,那么它將拿到(較小值)較大索引的值。經過第一趟交換后將變成\((2, 4, 7, 5)\),第二趟交換就不討論了。

根據對元素所處序列、元素當前索引和比較結果的討論,可以產生出八種情況:

所處序列 當前索引 比較結果 取值結果
遞減 小索引 <= (較大值)較大索引的值
遞減 大索引 <= (較小值)較大索引的值
遞增 小索引 <= (較小值)較小索引的值
遞增 大索引 <= (較大值)較小索引的值
遞減 小索引 > (較大值)較小索引的值
遞減 大索引 > (較小值)較小索引的值
遞增 小索引 > (較小值)較大索引的值
遞增 大索引 > (較大值)較大索引的值

顯然現有的變量判斷較大/較小索引值比判斷較大值/較小值容易得多。上述結果表可以整理成下面的代碼:

uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ?
	shared_data[largerIndex] : shared_data[smallerIndex];
GroupMemoryBarrierWithGroupSync();

shared_data[GI] = result;
GroupMemoryBarrierWithGroupSync();

在C++中,現在有如下資源和着色器:

ComPtr<ID3D11Buffer> m_pConstantBuffer;				// 常量緩沖區
ComPtr<ID3D11Buffer> m_pTypedBuffer1;				// 有類型緩沖區1
ComPtr<ID3D11Buffer> m_pTypedBuffer2;				// 有類型緩沖區2
ComPtr<ID3D11Buffer> m_pTypedBufferCopy;			// 用於拷貝的有類型緩沖區
ComPtr<ID3D11UnorderedAccessView> m_pDataUAV1;		// 有類型緩沖區1對應的無序訪問視圖
ComPtr<ID3D11UnorderedAccessView> m_pDataUAV2;		// 有類型緩沖區2對應的無序訪問視圖
ComPtr<ID3D11ShaderResourceView> m_pDataSRV1;		// 有類型緩沖區1對應的着色器資源視圖
ComPtr<ID3D11ShaderResourceView> m_pDataSRV2;		// 有類型緩沖區2對應的着色器資源視圖

然后就是對512個元素進行排序的部分代碼(size為2的次冪):

void GameApp::SetConstants(UINT level, UINT descendMask, UINT matrixWidth, UINT matrixHeight);

//
// GameApp::GPUSort
//

m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);

// 按行數據進行排序,先排序level <= BLOCK_SIZE 的所有情況
for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2)
{
	SetConstants(level, level, 0, 0);
	m_pd3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1);
}

給更多的數據排序

上述代碼允許我們對元素個數為2到512的序列進行排序,但緩沖區的元素數目必須為2的次冪。由於在CS4.0中,一個線程組最多允許一個線程組包含768個線程,這意味着雙調排序僅允許在一個線程組中對最多512個元素進行排序。

接下來我們看一個例子,假如有一個16元素的序列,然而線程組僅允許包含最多4個線程,那我們將其放置在一個4x4的矩陣內:

然后對矩陣轉置:

可以看到,通過轉置后,列數據變換到行數據的位置,這樣我們就可以進行跨度更大的交換操作了。處理完大跨度的交換后,我們再轉置回來,處理行數據即可。

現在假定我們已經對行數據排完序,下圖演示了剩余的排序過程:

但是在線程組允許最大線程數為4的情況下,通過二維矩陣最多也只能排序16個數。。。。也許可以考慮三維矩陣轉置法,這樣就可以排序64個數了哈哈哈。。。

不過還有一個情況我們要考慮,就是元素數目不為(2x2)的倍數,無法構成一個方陣,但我們也可以把它變成對兩個方陣轉置。這時矩陣的寬是高的兩倍:

由於元素個數為32,它的最大索引跨度為16,轉置后的索引跨度為2,不會越界到另一個方陣進行比較。但是當g_Level到32時,此時進行的是單調排序,g_DescendMask也必須設為最大值32(而不是4),避免產生雙調序列。

通過下面的轉置算法,使得原本只能排序512個數的算法現在可以排序最大262144個數了。

負責轉置的着色器實現如下:

// MatrixTranspose_CS.hlsl
#include "BitonicSort.hlsli"

#define TRANSPOSE_BLOCK_SIZE 16

groupshared uint shared_data[TRANSPOSE_BLOCK_SIZE * TRANSPOSE_BLOCK_SIZE];

[numthreads(TRANSPOSE_BLOCK_SIZE, TRANSPOSE_BLOCK_SIZE, 1)]
void CS(uint3 Gid : SV_GroupID,
    uint3 DTid : SV_DispatchThreadID,
    uint3 GTid : SV_GroupThreadID,
    uint GI : SV_GroupIndex)
{
    uint index = DTid.y * g_MatrixWidth + DTid.x;
    shared_data[GI] = g_Input[index];
    GroupMemoryBarrierWithGroupSync();
	
    uint2 outPos = DTid.yx % g_MatrixHeight + DTid.xy / g_MatrixHeight * g_MatrixHeight;
    g_Data[outPos.y * g_MatrixWidth + outPos.x] = shared_data[GI];
}

最后是GPU排序用的函數:

#define BITONIC_BLOCK_SIZE 512

#define TRANSPOSE_BLOCK_SIZE 16

void GameApp::GPUSort()
{
	UINT size = (UINT)m_RandomNums.size();

	m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
	m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);

	// 按行數據進行排序,先排序level <= BLOCK_SIZE 的所有情況
	for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2)
	{
		SetConstants(level, level, 0, 0);
		m_pd3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1);
	}
	
	// 計算相近的矩陣寬高(寬>=高且需要都為2的次冪)
	UINT matrixWidth = 2, matrixHeight = 2;
	while (matrixWidth * matrixWidth < size)
	{
		matrixWidth *= 2;
	}
	matrixHeight = size / matrixWidth;

	// 排序level > BLOCK_SIZE 的所有情況
	ComPtr<ID3D11ShaderResourceView> pNullSRV;
	for (UINT level = BITONIC_BLOCK_SIZE * 2; level <= size; level *= 2)
	{
		// 如果達到最高等級,則為全遞增序列
		if (level == size)
		{
			SetConstants(level / matrixWidth, level, matrixWidth, matrixHeight);
		}
		else
		{
			SetConstants(level / matrixWidth, level / matrixWidth, matrixWidth, matrixHeight);
		}
		// 先進行轉置,並把數據輸出到Buffer2
		m_pd3dImmediateContext->CSSetShader(m_pMatrixTranspose_CS.Get(), nullptr, 0);
		m_pd3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf());
		m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV2.GetAddressOf(), nullptr);
		m_pd3dImmediateContext->CSSetShaderResources(0, 1, m_pDataSRV1.GetAddressOf());
		m_pd3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE, 
			matrixHeight / TRANSPOSE_BLOCK_SIZE, 1);

		// 對Buffer2排序列數據
		m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
		m_pd3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1);

		// 接着轉置回來,並把數據輸出到Buffer1
		SetConstants(matrixWidth, level, matrixWidth, matrixHeight);
		m_pd3dImmediateContext->CSSetShader(m_pMatrixTranspose_CS.Get(), nullptr, 0);
		m_pd3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf());
		m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);
		m_pd3dImmediateContext->CSSetShaderResources(0, 1, m_pDataSRV2.GetAddressOf());
		m_pd3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE,
			matrixHeight / TRANSPOSE_BLOCK_SIZE, 1);

		// 對Buffer1排序剩余行數據
		m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
		m_pd3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1);
	}
}

最后是std::sort和雙調排序(使用NVIDIA GTX 850M)的比較結果:

元素數目 CPU****排序(s) GPU****排序(s) GPU****排序+寫回內存總時(s)
512 0.000020 0.000003 0.000186
1024 0.000043 0.000007 0.000226
2048 0.000102 0.000009 0.000310
4096 0.000245 0.000009 0.000452
8192 0.000512 0.000010 0.000771
16384 0.001054 0.000012 0.000869
32768 0.002114 0.000012 0.001819
65536 0.004448 0.000014 0.002625
131072 0.009600 0.000015 0.005130
262144 0.021555 0.000021 0.008983
524288 0.048621 0.000030 0.015652
1048576 0.111219 0.000044 0.040927
2097152 0.257118 0.000072 0.188662
4194304 0.444995 0.000083 0.300240
8388608 0.959173 0.000124 0.465746
16777216 2.178453 0.000177 0.915898
33554432 5.869439 0.000361 2.269989

可以初步看到雙調排序的排序用時比較穩定,而快速排序明顯隨元素數目增長而變慢。當然,如果GPU排序的數據量再更大一些的話,可以看到時間的明顯增長。

但是!如果你用GPU排序后需要取回到CPU,因為GPU->CPU的速度比較慢,並且需要等待資源結束占用才能開始取出,故需要排較大數目的數據(約10000以上)才有明顯效益

DirectX11 With Windows SDK完整目錄

Github項目源碼

歡迎加入QQ群: 727623616 可以一起探討DX11,以及有什么問題也可以在這里匯報。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM