CUDA並行算法系列之規約


CUDA並行算法系列之規約

前言

規約是一類並行算法,對傳入的N個數據,使用一個二元的符合結合律的操作符⊕,生成1個結果。這類操作包括取最小、取最大、求和、平方和、邏輯與/或、向量點積。規約也是其他高級算法中重要的基礎算法。

除非操作符⊕的求解代價極高,否則規約傾向於帶寬受限型任務(bandwidthbound)。本文將介紹幾種規約算法的實現,從兩遍規約、block的線程數必須為2的冪,一步一步優化到單遍規約、任意線程數的規約實現,還將討論討論基於C++模板的優化。

規約

對於N個輸入數據和操作符+,規約可表示為:

規約公式

下圖展示了一些處理8個元素規約操作的實現:

8個元素規約操作的實現

從圖中可以看到,不同的實現其時間復雜度也是不一樣的,其中串行實現完成計算需要7步,性能比較差。成對的方式是典型的分治思想,只需要lgN步來計算結果,由於不能合並內存事務,這種實現在CUDA中性能較差。

在CUDA中,無論是對全局內存還是共享內存,基於交替策略效果更好。對於全局內存,使用blockDim.x*gridDim.x的倍數作為交替因子有良好的性能,因為所有的內存事務將被合並。對於共享內存,最好的性能是按照所確定的交錯因子來累計部分結果,以避免存儲片沖突,並保持線程塊的相鄰線程處於活躍狀態。

兩遍規約

該算法包含兩個階段,並且兩個階段調用同一個內核。第一階段內核執行NumBlocks個並行規約,其中NumBlocks是指線程塊數,得到一個中間結果數組。第二個階段通過調用一個線程塊對這個中間數組進行規約,從而得到最終結果。改算法的執行如下圖所示:

reduction

假設有對768個輸入數據進行規約,NumBlocks=256,第一階段使用2個Block進行規約,此時內核執行兩個並行規約,並把結果保存在中間數組partial中,其中partial的大小為2,partial[0]保存線程塊0的規約結果,partial1保存線程塊1的結果。第二階段對parital進行規約,此時內核值啟動一個Block,因此,最終得到一個規約結果,這個結果就是對輸入數據的規約結果。

規約算法采用了交替策略,兩遍規約的代碼如下:

// 兩遍規約
__global__ void reduction1_kernel(int *out, const int *in, size_t N)
{
    // lenght = threads (BlockDim.x)
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;
    for (size_t i = blockIdx.x * blockDim.x + tid; i < N; i += blockDim.x * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    for (int activeTrheads = blockDim.x / 2; activeTrheads > 0; activeTrheads /= 2)
    {
        if (tid < activeTrheads)
        {
            sPartials[tid] += sPartials[tid + activeTrheads];
        }
        __syncthreads();
    }

    if (tid == 0)
    {
        out[blockIdx.x] = sPartials[0];
    }
}

void reduction1(int *answer, int *partial, const int *in, const size_t N, const int numBlocks, int numThreads)
{
    unsigned int sharedSize = numThreads * sizeof(int);

    // kernel execution
    reduction1_kernel<<<numBlocks, numThreads, sharedSize>>>(partial, in, N);
    reduction1_kernel<<<1, numThreads, sharedSize>>>(answer, partial, numBlocks);
}

共享內存的大小等於線程塊的線程數量,在啟動的時候指定。同時要注意,該內核塊的線程數量必須是2的冪次,在下文,將介紹如何使用任意大小的數據。

CUDA會把線程組成線程束warp(目前是32個線程),warp的執行由SIMD硬件完成,每個線程塊中的線程束是按照鎖步方式(lockstep)執行每條指令的,因此當線程塊中活動線程數低於硬件線程束的大小時,可以無須再調用__syncthreads()來同步。不過需要注意,編寫線程束同步代碼時,必須對共享內存的指針使用volatile關鍵字修飾,否則可能會由於編譯器的優化行為改變內存的操作順序從而使結果不正確。采用線程束優化的代碼如下:

// 兩遍規約
__global__ void reduction1_kernel(int *out, const int *in, size_t N)
{
    // lenght = threads (BlockDim.x)
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;
    for (size_t i = blockIdx.x * blockDim.x + tid; i < N; i += blockDim.x * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    for (int activeTrheads = blockDim.x / 2; activeTrheads > 32; activeTrheads /= 2)
    {
        if (tid < activeTrheads)
        {
            sPartials[tid] += sPartials[tid + activeTrheads];
        }
        __syncthreads();
    }

   // 線程束同步
    if (tid < 32)
    {
        volatile int *wsSum = sPartials;
        if (blockDim.x > 32)
        {
            wsSum[tid] += wsSum[tid + 32];
        }

        wsSum[tid] += wsSum[tid + 16];
        wsSum[tid] += wsSum[tid + 8];
        wsSum[tid] += wsSum[tid + 4];
        wsSum[tid] += wsSum[tid + 2];
        wsSum[tid] += wsSum[tid + 1];

        if (tid == 0)
        {
            out[blockIdx.x] = wsSum[0];
        }
    }
}

通過把線程數變成一個模板參數,還可以把for循環展開進一步優化,展開后的代碼如下:

// 兩遍規約
template<unsigned int numThreads>
__global__ void reduction1_kernel(int *out, const int *in, size_t N)
{
    // lenght = threads (BlockDim.x)
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;
    for (size_t i = blockIdx.x * numThreads+ tid; i < N; i += numThreads * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    if (numThreads >= 1024)
    {
        if (tid < 512) sPartials[tid] += sPartials[tid + 512];
        __syncthreads();
    }
    if (numThreads >= 512)
    {
        if (tid < 256) sPartials[tid] += sPartials[tid + 256];
        __syncthreads();
    }
    if (numThreads >= 256)
    {
        if (tid < 128) sPartials[tid] += sPartials[tid + 128];
        __syncthreads();
    }
    if (numThreads >= 128)
    {
        if (tid < 64) sPartials[tid] += sPartials[tid + 64];
        __syncthreads();
    }

    if (tid < 32)
    {
        volatile int *wsSum = sPartials;
        if (numThreads >= 64) wsSum[tid] += wsSum[tid + 32];
        if (numThreads >= 32) wsSum[tid] += wsSum[tid + 16];
        if (numThreads >= 16) wsSum[tid] += wsSum[tid + 8];
        if (numThreads >= 8) wsSum[tid] += wsSum[tid + 4];
        if (numThreads >= 4) wsSum[tid] += wsSum[tid + 2];
        if (numThreads >= 2) wsSum[tid] += wsSum[tid + 1];

        if (tid == 0)
        {
            out[blockIdx.x] = wsSum[0];
        }
    }
}

為了實例化模板,需要另外一個函數來調用:

template<unsigned int numThreads>
void reduction1_template(int *answer, int *partial, const int *in, const size_t N, const int numBlocks)
{
    unsigned int sharedSize = numThreads * sizeof(int);

    // kernel execution
    reduction1_kernel<numThreads><<<numBlocks, numThreads, sharedSize>>>(partial, in, N);
    reduction1_kernel<numThreads><<<1, numThreads, sharedSize>>>(answer, partial, numBlocks);
}

void reduction1t(int *answer, int *partial, const int *in, const size_t N, const int numBlocks, int numThreads)
{
    switch (numThreads)
    {
        case 1: reduction1_template<1>(answer, partial, in, N, numBlocks); break;
        case 2: reduction1_template<2>(answer, partial, in, N, numBlocks); break;
        case 4: reduction1_template<4>(answer, partial, in, N, numBlocks); break;
        case 8: reduction1_template<8>(answer, partial, in, N, numBlocks); break;
        case 16: reduction1_template<16>(answer, partial, in, N, numBlocks); break;
        case 32: reduction1_template<32>(answer, partial, in, N, numBlocks); break;
        case 64: reduction1_template<64>(answer, partial, in, N, numBlocks); break;
        case 128: reduction1_template<128>(answer, partial, in, N, numBlocks); break;
        case 256: reduction1_template<256>(answer, partial, in, N, numBlocks); break;
        case 512: reduction1_template<512>(answer, partial, in, N, numBlocks); break;
        case 1024: reduction1_template<1024>(answer, partial, in, N, numBlocks); break;
    }
}

任意線程塊大小的規約

前面的內核塊的線程數量必須是2的冪次,否則計算結果是不正確的,下面把它修改成適應任意大小的數據。其實只需在循環之前把待規約的數組(shared memory 中的sPartials)調整成2的冪次即可,如下圖所示:

類似於向下取整,對於非2^n長度的數組(L),取小於L的最大的2^n的數(floorPow2),把后面的是數據使用⊕操作把中間結果保存在index-floorPow2中,其中index是數據原來的索引。

N & (N - 1)=0時,N為2的冪詞,否則繼續計算 N &= (N - 1),知道N & (N - 1)=0,此時N即為floorPow2。

修改后的兩遍規約代碼如下:

// 兩遍規約
__global__ void reduction1_kernel(int *out, const int *in, size_t N)
{
    // lenght = threads (BlockDim.x)
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;
    for (size_t i = blockIdx.x * blockDim.x + tid; i < N; i += blockDim.x * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    // 調整為2^n
    unsigned int floowPow2 = blockDim.x;
    if (floowPow2 & (floowPow2 - 1))
    {
        while(floowPow2 & (floowPow2 - 1))
        {
            floowPow2 &= (floowPow2 - 1);
        }
        if (tid >= floowPow2)
        {
            sPartials[tid - floowPow2] += sPartials[tid];
        }
        __syncthreads();
    }

    for (int activeTrheads = floowPow2 / 2; activeTrheads > 32; activeTrheads /= 2)
    {
        if (tid < activeTrheads)
        {
            sPartials[tid] += sPartials[tid + activeTrheads];
        }
        __syncthreads();
    }

    if (tid < 32)
    {
        volatile int *wsSum = sPartials;
        if (floowPow2 > 32)
        {
            wsSum[tid] += wsSum[tid + 32];
        }

        if (floowPow2 > 16) wsSum[tid] += wsSum[tid + 16];
        if (floowPow2 > 8) wsSum[tid] += wsSum[tid + 8];
        if (floowPow2 > 4) wsSum[tid] += wsSum[tid + 4];
        if (floowPow2 > 2) wsSum[tid] += wsSum[tid + 2];
        if (floowPow2 > 1) wsSum[tid] += wsSum[tid + 1];

        if (tid == 0)
        {
            volatile int *wsSum = sPartials;
            out[blockIdx.x] = wsSum[0];
        }
    }
}

void reduction1(int *answer, int *partial, const int *in, const size_t N, const int numBlocks, int numThreads)
{
    unsigned int sharedSize = numThreads * sizeof(int);

    // kernel execution
    reduction1_kernel<<<numBlocks, numThreads, sharedSize>>>(partial, in, N);
    reduction1_kernel<<<1, numThreads, sharedSize>>>(answer, partial, numBlocks);
}

當然,對於循環展開的代碼也是一樣的:

template<unsigned int numThreads>
__global__ void reduction1_kernel(int *out, const int *in, size_t N)
{
    // lenght = threads (BlockDim.x)
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;
    for (size_t i = blockIdx.x * numThreads+ tid; i < N; i += numThreads * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    // 調整為2^n
    unsigned int floorPow2 = blockDim.x;
    if (floorPow2 & (floorPow2 - 1))
    {
        while(floorPow2 & (floorPow2 - 1))
        {
            floorPow2 &= (floorPow2 - 1);
        }
        if (tid >= floorPow2)
        {
            sPartials[tid - floorPow2] += sPartials[tid];
        }
        __syncthreads();
    }

    if (floorPow2 >= 1024)
    {
        if (tid < 512) sPartials[tid] += sPartials[tid + 512];
        __syncthreads();
    }
    if (floorPow2 >= 512)
    {
        if (tid < 256) sPartials[tid] += sPartials[tid + 256];
        __syncthreads();
    }
    if (floorPow2 >= 256)
    {
        if (tid < 128) sPartials[tid] += sPartials[tid + 128];
        __syncthreads();
    }
    if (floorPow2 >= 128)
    {
        if (tid < 64) sPartials[tid] += sPartials[tid + 64];
        __syncthreads();
    }

    if (tid < 32)
    {
        volatile int *wsSum = sPartials;
        if (floorPow2 >= 64) wsSum[tid] += wsSum[tid + 32];
        if (floorPow2 >= 32) wsSum[tid] += wsSum[tid + 16];
        if (floorPow2 >= 16) wsSum[tid] += wsSum[tid + 8];
        if (floorPow2 >= 8) wsSum[tid] += wsSum[tid + 4];
        if (floorPow2 >= 4) wsSum[tid] += wsSum[tid + 2];
        if (floorPow2 >= 2) wsSum[tid] += wsSum[tid + 1];

        if (tid == 0)
        {
            volatile int *wsSum = sPartials;
            out[blockIdx.x] = wsSum[0];
        }
    }
}

基於原子操作的單遍規約

上面的代碼需要啟動兩次內核,當然,由於CPU和GPU是異步執行,其實效率也是很高的。下面來介紹單遍規約,兩遍規約的做法主要針對CUDA線程塊無法同步這一問題的解決方法,使用原子操作和共享內存的組合可以避免第二個內核的調用。

如果硬件支持操作符⊕的原子操作,那么單遍規約操作就可以變得很簡單,比如對於加法操作,只需調用atomicAdd()把塊中的部分結果加到全局內存中即可,代碼如下:

__global__ void reduction2_kernel(int *out, const int *in, size_t N)
{
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;

    for (size_t i = blockIdx.x * blockDim.x + tid; i < N; i += blockDim.x * gridDim.x)
    {
        sum += in[i];
    }
    atomicAdd(out, sum);
}

void reduction2(int *answer, int *partial, const int *in, const size_t N, const int numBlocks, int numThreads)
{
    unsigned int sharedSize = numThreads * sizeof(int);
    cudaMemset(answer, 0, sizeof(int));

    reduction2_kernel<<<numBlocks, numThreads, sharedSize>>>(answer, in, N);
}

當然,為了減少全局內存的寫操作和原子操作的競爭,可以進行一些優化,在塊中先進行規約操作得到部分結果,再把這個部分結果使用原子操作加到全局內存中,這樣每個塊只需一次全局內存寫操作和原子操作,內核代碼如下:

__global__ void reduction2_kernel(int *out, const int *in, size_t N)
{
    extern __shared__ int sPartials[];
    int sum = 0;
    const int tid = threadIdx.x;

    for (size_t i = blockIdx.x * blockDim.x + tid; i < N; i += blockDim.x * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    unsigned int floorPow2 = blockDim.x;
    if (floorPow2 & (floorPow2 - 1))
    {
        while(floorPow2 & (floorPow2 - 1))
        {
            floorPow2 &= (floorPow2 - 1);
        }
        if (tid >= floorPow2)
        {
            sPartials[tid - floorPow2] += sPartials[tid];
        }
        __syncthreads();
    }

    for (int activeTrheads = floorPow2 / 2; activeTrheads > 32; activeTrheads /= 2)
    {
        if (tid < activeTrheads)
        {
            sPartials[tid] += sPartials[tid + activeTrheads];
        }
        __syncthreads();
    }

    if (tid < 32)
    {
        volatile int *wsSum = sPartials;
        if (floorPow2 > 32) wsSum[tid] += wsSum[tid + 32];
        if (floorPow2 > 16) wsSum[tid] += wsSum[tid + 16];
        if (floorPow2 > 8) wsSum[tid] += wsSum[tid + 8];
        if (floorPow2 > 4) wsSum[tid] += wsSum[tid + 4];
        if (floorPow2 > 2) wsSum[tid] += wsSum[tid + 2];
        if (floorPow2 > 1) wsSum[tid] += wsSum[tid + 1];
        if (tid == 0)
        {
            atomicAdd(out, wsSum[0]);
        }
    }
}

非原子操作的單遍規約

對於不支持原子操作的單遍規約就要復雜一些,需要使用一個設備內存(全局內存)位置跟蹤哪個線程塊已經寫完了自己的部分,一旦所有的塊都完成后,使用一個快進行最后的規約,將結果寫回。

由於需要多次執行對共享內存的數據進行規約,因此把上面的規約代碼單獨提取出來作為一個設備函數,以便重用。reduction3_logStepShared()函數,對來自共享內存的sPartials執行規約操作,規約結果寫會由out指定的內存,代碼如下(這段代碼也可以使用模板展開for循環):

__device__ void reduction3_logStepShared(int *out, volatile int *sPartials)
{
    const int tid = threadIdx.x;
    int floorPow2 = blockDim.x;

    if (floorPow2 & (floorPow2 - 1))
    {
        while (floorPow2 & (floorPow2 - 1))
        {
            floorPow2 &= (floorPow2 - 1);
        }
        if (tid >= floorPow2)
        {
            sPartials[tid - floorPow2] += sPartials[tid];
        }
        __syncthreads();
    }

    for (int activeTrheads = floorPow2 / 2; activeTrheads > 32; activeTrheads /= 2)
    {
        if (tid < activeTrheads)
        {
            sPartials[tid] += sPartials[tid + activeTrheads];
        }
        __syncthreads();
    }

    if (tid < 32)
    {
        if (floorPow2 > 32) sPartials[tid] += sPartials[tid + 32];
        if (floorPow2 > 16) sPartials[tid] += sPartials[tid + 16];
        if (floorPow2 > 8) sPartials[tid] += sPartials[8];
        if (floorPow2 > 4) sPartials[tid] += sPartials[4];
        if (floorPow2 > 2) sPartials[tid] += sPartials[2];
        if (floorPow2 > 1) sPartials[tid] += sPartials[1];

        if (tid == 0)
        {
            *out = sPartials[0];
        }
    }
}

單遍規約的代碼如下:

__device__ unsigned int retirementCount = 0;
__global__ void reduction3_kernel(int *out, int *partial, const int *in, size_t N)
{
    extern __shared__ int sPartials[];
    const int tid = threadIdx.x;
    int sum = 0;

    for (size_t i = blockIdx.x * blockDim.x + tid; i < N; i += blockDim.x * gridDim.x)
    {
        sum += in[i];
    }
    sPartials[tid] = sum;
    __syncthreads();

    if (gridDim.x == 1)
    {
        reduction3_logStepShared(out, sPartials);
        return;
    }
    reduction3_logStepShared(&partial[blockIdx.x], sPartials);

    __shared__ bool lastBlock;
    __threadfence();

    if (tid == 0)
    {
        unsigned int ticket = atomicAdd(&retirementCount, 1);
        lastBlock = (ticket == gridDim.x - 1);
    }

    __syncthreads();

    if (lastBlock)
    {
        sum = 0;
        for (size_t i = tid; i < gridDim.x; i += blockDim.x)
        {
            sum += partial[i];
        }
        sPartials[tid] = sum;
        __syncthreads();

        reduction3_logStepShared(out, sPartials);
        retirementCount_1 = 0;
    }
}

void reduction3(int *answer, int *partial, const int *in, const size_t N, const int numBlocks, int numThreads)
{
    unsigned int sharedSize = numThreads * sizeof(int);
    reduction3_kernel<<<numBlocks, numThreads, sharedSize>>>(answer, partial, in, N);
}

和前面的規約代碼類似,內核首先計算部分規約並將結果寫入共享內存。一旦完成上述操作,接下來需要分兩種情況討論:1個Block和多個Block。

1個Block時不存在Block間同步的問題,直接調用reduction3_logStepShared()就得到了最終規約結果。

多個Block時情況要復雜,由於Block間無法直接通信,使用一個全局內存上的變量retirementCount來記錄線程塊的執行情況,每個Block的0號線程調用atomicAdd()函數對retirementCount進行加1操作,atomicAdd()會返回操作前的值,最后一個Block會把lastBlock設為真,在最后一個線程塊中對所有塊的部分規約結果再次規約,得到最終規約結果。

該內核代碼調用了__threadfence()函數,該函數會導致所有的塊都等待,直到任何掛起的內存事務已寫回到設備內存。當__threadfence()執行時,寫入全局內存的操作不僅僅可見於被調用線程或者線程塊,而是所有線程。

測試

thrust就包含了規約操作,因此可以使用thrust來驗證計算結果的正確性,同時使用事件來記錄運行的時間,測試其性能,測試代碼如下:

    cudaEvent_t start, stop;
    checkCudaErrors( cudaEventCreate(&start) );
    checkCudaErrors( cudaEventCreate(&stop) );
    cudaEventRecord(start);

    for (int i = 0; i < cIterations; ++i)
    {
        func(answer, partial, in, N, numBlocks, numThreads);
    }

    cudaEventRecord(stop);
    checkCudaErrors( cudaThreadSynchronize() );
    float time = 0;
    cudaEventElapsedTime(&time, start, stop);

完整的代碼可以在GitHub上找到:algorithms-cuda

測試結果

使用1024*1024的數據分別在一下兩個測試環境中進行測試,每個內核執行100次,測試結果如下:

測試環境1

GPU GeForce GT 750M
CUDA v7.5
OS Mac OS 10.11.5

結果:

blocks: 1024 threads: 1024
simple loop time: 253.916ms host answer (1048576) = device answer (1048576)
simple loop template time: 144.405ms host answer (1048576) = device answer (1048576)
atomicAdd time: 132.541ms host answer (1048576) = device answer (1048576)
atomicAdd template time: 115.428ms host answer (1048576) = device answer (1048576)
single pass time: 1724.43ms host answer (1048576) = device answer (1048576)
single pass template time: 1190.27ms host answer (1048576) = device answer (1048576)

測試環境2

GPU GeForce GTX 750 Ti
CUDA v7.5
OS Ubuntu 14.04

結果:

blocks: 1024 threads: 1024
simple loop time: 28.8353ms host answer (1048576) = device answer (1048576)
simple loop template time: 30.3201ms host answer (1048576) = device answer (1048576)
atomicAdd time: 29.7131ms host answer (1048576) = device answer (1048576)
atomicAdd template time: 29.4682ms host answer (1048576) = device answer (1048576)
single pass time: 39.1131ms host answer (1048576) = device answer (1048576)
single pass template time: 33.0042ms host answer (1048576) = device answer (1048576)

結語

本文介紹了基於CUDA的規約算法,分別實現了兩遍規約、基於原子操作的單遍規約和非原子操作的單遍規約,同時也討論了基於模板的循環展開和適應任意數據大小的方法。最后給出了測試代碼並在兩個測試環境中進行了測試。

從測試結果了可以看出,基於模板的循環展開優化對於GT 750M具有較好的優化效果,而對於GTX 750 Ti的性能提升不大。受限於__threadfence()和原子操作的性能,單遍規約在這兩款GPU下都沒有得到性能提升,不過在GT 750M下,如果不執行__threadfence(),性能將得到極大的提升(從1724.43ms到143.424ms)。在高端GPU下,單遍規約應該會比兩遍規約效率更高,目前沒有設備測試,等測試后再更新。

本文的代碼可以在GitHub上找到:algorithms-cuda

參考文獻

  1. NICHOLAS WILT[美]. CUDA專家手冊--GPU編程權威指南(高性能計算機系列叢書)[M]. 機械工業出版社, 2014.
  2. Kirk D B, Wen-mei W H. Programming massively parallel processors: a hands-on approach[M]. Newnes, 2012.
  3. thrust. https://github.com/thrust/thrust/tree/master/thrust


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM