深入淺出 Barriers 實現(一)


Barriers,字面意思為“壁壘,屏障,柵欄”,在計算機領域中 Barriers 也有它獨特的含義,具體來講,在並行程序中,Barriers 是一種同步的手段,可被視為一種線程同步原語,如一組線程/進程的 Barrier 可以用來同步該線程/進程組,只有當該線程/進程組中所有線程到達屏障點(可稱之為同步點)時,整個程序才得以繼續執行。如比較熟悉的 Memory Barriers(Wikipedia),Memory Barriers可翻譯為內存屏障。由於現代絕大多數的CPU都采用流水線和亂序執行(out-of-order),一條指令的執行一般分為:取指,譯碼,訪存,執行,寫回的若干個階段,多條指令可能同時存在與流水線中並同時被執行,由於CPU具備亂序執行,往往后取的指令被提前執行了,還有,在多處理器的條件下,CPU 可能共享 Cache,此時就會發生 Cache 不一致的情況,再加上編譯器優化,問題就變得很復雜了,如何保證指令執行的一致性呢?內存屏障就發揮了很大作用。在 Linux 內核中,最簡單的內存屏障原語是(更多有關內核屏障請參看文檔):

#define barrier() __asm__ __volatile__("":::"memory")

barrier() 的作用就是告訴編譯器,內存中變量的值已經改變了,之前保存與寄存器或cache中的變量副本無效,如果訪問該變量需要直接去內存中讀取。內存屏障主要有:讀屏障,寫屏障,優化屏障,通用屏障等,以讀屏障為例,它用於保證讀操作有序,屏障之前的讀操作一定會由於屏障之后的讀操作完成,寫屏障與此類似,只是用於限制寫操作。優化屏障用於限制編譯器的指令重排,關於優化屏障,可以參考以下系列文章,1, 2, 3

好了,上面講的內容只是開始,現在就讓我們來一步一步動手實現並優化我們自己的 Barriers 吧 :-)

前面講過,Barriers 是一種基本的同步原語,它可以告訴你一組線程在什么時候完成了各自的任務可以接下來進行其他的工作,即一旦所有的線程都到達了屏障點,它們才能夠繼續執行下去,否則先到達屏障點的線程就會在此處等待其他線程的到來,鑒於此,屏障操作也是一個相當重量級的同步操作。但是,很多並行算法都可以通過屏障來把復雜的計算任務分階段執行后進行同步,以此來簡化算法的設計。

pthread 庫本身支持 pthread_barrier_t 數據類型,並且提供了pthread_barrier_init, pthread_barrier_destroy, pthread_barrier_wait API, 為了使用 pthread 的 barrier, 你必須先初始化 barrier,參數為需要同步的線程數目(以及其他可選的屬性,如 barrier 是用於線程間的同步還是線程內的同步),然后各個線程通過調用 pthread_barrier_wait 來進行同步,最后調用 pthread_barrier_destroy 銷毀創建的 barrier。

銷毀 barrier 原語需要特殊處理,而許多其他的同步原語可能根本就不需要銷毀操作,因為他們並不分配內存,並且其依賴 的futex 系統調用也不需要顯式地釋放資源, 雖然 barrier 也不分配內存同時也使用了 futex 系統調用( 基於 Linux 的實現),但是仍然需要一些工作要做。

但問題是,barrier 銷毀的銷毀操作可能發生在其他線程離開 barriers 時,那些線程需要檢查 barrier 的狀態,如果此時 barrier 使用的內存被釋放了,那么程序的行為就不確定了,不過大部分情況下都可能發生 segfault。因此,銷毀 barrier 的線程需要等待其他的線程都退出了 barrier 后才能執行銷毀操作。為了達到該要求,我們需要做一些額外的同步,不過幸運的是,我們可以使用條件變量和互斥量來完成該工作,本文稍后會給出一個最簡單的例子。

我們可以通過 pthread 的互斥量和條件變量來實現簡單的 barrier,大致代碼如下:

  •  pthread-barrier.h
*
 * =============================================================================
 *
 *       Filename:  pthread-barrier.h
 *
 *    Description:  pthread barrier implementation.
 *
 *        Created:  12/26/2012 11:08:06 AM
 *
 *         Author:  Fu Haiping (forhappy), haipingf@gmail.com
 *        Company:  ICT ( Institute Of Computing Technology, CAS )
 *
 * =============================================================================
 */
#ifndef _PTHREAD_BARRIER_H_
#define _PTHREAD_BARRIER_H_

#include <pthread.h>

typedef struct barrier_s_ barrier_t;

struct barrier_s_ {
    unsigned count;
    unsigned total;
    pthread_mutex_t m;
    pthread_cond_t cv;
};

#define BARRIER_FLAG (1UL<<31)

extern void barrier_init(barrier_t *b, unsigned count);

extern int barrier_wait(barrier_t *b);

extern void barrier_destroy(barrier_t *b);

#endif /* _PTHREAD_BARRIER_H_ */
  • pthread-barrier.c
/*
 * =============================================================================
 *
 *       Filename:  pthread-barrier.c
 *
 *    Description:  pthread barrier implementation.
 *
 *        Created:  12/26/2012 11:08:14 AM
 *
 *         Author:  Fu Haiping (forhappy), haipingf@gmail.com
 *        Company:  ICT ( Institute Of Computing Technology, CAS )
 *
 * =============================================================================
 */

#include "pthread-barrier.h"

void barrier_destroy(barrier_t *b)
{
    pthread_mutex_lock(&b->m);

    while (b->total > BARRIER_FLAG)
    {
        /* 等待所有線程退出 barrier. */
        pthread_cond_wait(&b->cv, &b->m);
    }

    pthread_mutex_unlock(&b->m);

    pthread_cond_destroy(&b->cv);
    pthread_mutex_destroy(&b->m);
}

void barrier_init(barrier_t *b, unsigned count)
{
    pthread_mutex_init(&b->m, NULL);
    pthread_cond_init(&b->cv, NULL);
    b->count = count;
    b->total = BARRIER_FLAG;
}

int barrier_wait(barrier_t *b)
{
    pthread_mutex_lock(&b->m);

    while (b->total > BARRIER_FLAG)
    {
        pthread_cond_wait(&b->cv, &b->m);
    }

    /* 是否為第一個到達 barrier 的線程? */
    if (b->total == BARRIER_FLAG) b->total = 0;

    b->total++;

    if (b->total == b->count)
    {
        b->total += BARRIER_FLAG - 1;
        pthread_cond_broadcast(&b->cv);

        pthread_mutex_unlock(&b->m);

        return PTHREAD_BARRIER_SERIAL_THREAD;
    }
    else
    {
        while (b->total < BARRIER_FLAG)
        {
            pthread_cond_wait(&b->cv, &b->m);
        }

        b->total--;

        /* 喚醒所有進入 barrier 的線程. */
        if (b->total == BARRIER_FLAG) pthread_cond_broadcast(&b->cv);

        pthread_mutex_unlock(&b->m);

        return 0;
    }
}

 測試程序如下:

#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#include "pthread-barrier.h"
#include "futex-barrier.h"
#include "ticket-barrier.h"
#include "fast-barrier.h"

// #define USE_MUTEX_CV_BARRIER

#if defined(USE_MUTEX_CV_BARRIER)
  #define pthread_barrier_t barrier_t
  #define pthread_barrier_init(B, A, N) (barrier_init((B), (N)), 0)
  #define pthread_barrier_destroy(B) (barrier_destroy(B), 0)
  #define pthread_barrier_wait barrier_wait

#endif

/* Number of threads to use */
#define N 2

/* Number of times to call barrier primitive */
#define COUNT 1000000

static pthread_barrier_t barrier;

static void *do_barrier_bench(void *arg)
{
    int i;

    (void) arg;

    for (i = 0; i < COUNT; i++)
    {
        pthread_barrier_wait(&barrier);
    }

    return NULL;
}

int main(void)
{
    pthread_t th[N - 1];
    clock_t start, end;
    int i;

    pthread_barrier_init(&barrier, NULL, N);

    for (i = 0; i < N - 1; i++) {
        if (pthread_create(&th[i], NULL, do_barrier_bench, NULL)) {
            fprintf(stderr, "pthread_create failed");
            exit(EXIT_FAILURE);
        }
    }
    start = clock();

    do_barrier_bench(NULL);

    end = clock();

    pthread_barrier_destroy(&barrier);

    for (i = 0; i < N - 1; i++) {
        if (pthread_join(th[i], NULL)) {
            fprintf(stderr, "pthread_join failed");
            exit(EXIT_FAILURE);
        }
    }

    puts("bench OK");

    printf("time elasped %lds.\n", (end - start) / CLOCKS_PER_SEC);
    return 0;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM