[充電]多線程無鎖編程--原子計數操作:__sync_fetch_and_add等12個操作


轉自:http://blog.csdn.net/minCrazy/article/details/40791795

多線程間計數操作、共享狀態或者統計相關時間次數,這些都需要在多線程之間共享變量和修改變量,如此就需要在多線程間對該變量進行互斥操作和訪問。

        通常遇到多線程互斥的問題,首先想到的就是加鎖lock,通過加互斥鎖來進行線程間互斥,但是最近有看一些開源的項目,看到有一些同步讀和操作的原子操作函數——__sync_fetch_and_add系列的命令,然后自己去網上查找一番,找到一篇博文有介紹這系列函數,學習一番后記錄下來。

 

首先,C/C++程序中count++這種操作不是原子的,一個自加操作,本質上分為3步:

  1. 從緩存取到寄存器
  2. 在寄存器內加1
  3. 再存入緩存
但是由於時序的因素,多線程操作同一個全局變量,就會出現很多問題。這就是多線程並發編程的難點,尤其隨着計算機硬件技術的快速發展,多CPU多核技術更彰顯出這種困難。

通常,最簡單的方法就是加鎖保護,互斥鎖(mutex),這也是我使用最多的解決方案。大致代碼如下:
pthread_mutex_t lock;
pthread_mutex_init(&lock,...);

pthread_mutex_lock(&lock);
count++;
pthread_mutex_unlock(&lock);

后來,在一些C/C++開源項目中,看到通過__sync_fetch_and_add一系列命令進行原子性操作,隨后就在網上查閱相關資料,發現有很多博客都有介紹這系列函數。

__sync_fetch_and_add系列一共有12個函數,分別:加/減/與/或/異或等原子性操作函數,__sync_fetch_and_add,顧名思義,先fetch,返回自加前的值。舉例說明,count = 4,調用__sync_fetch_and_add(&count, 1)之后,返回值是4,但是count變成5。同樣,也有__sync_add_and_fetch,先自加,然后返回自加后的值。這樣對應的關系,與i++和++i的關系是一樣的。

gcc從4.1.2開始提供了__sync_*系列的build-in函數,用於提供加減和邏輯運算的原子操作,其聲明如下:

type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)

type __sync_add_and_fetch (type *ptr, type value, ...)
type __sync_sub_and_fetch (type *ptr, type value, ...)
type __sync_or_and_fetch (type *ptr, type value, ...)
type __sync_and_and_fetch (type *ptr, type value, ...)
type __sync_xor_and_fetch (type *ptr, type value, ...)
type __sync_nand_and_fetch (type *ptr, type value, ...)

上述12個函數即為所有,通過函數名字就可以知道函數的作用。需要注意的是,這個type不能亂用(type只能是int, long, long long以及對應的unsigned類型),同時在用gcc編譯的時候要加上選項 -march=i686。
后面的可擴展參數(...)用來指出哪些變量需要memory barrier,因為目前gcc實現的是full barrier(類似Linux kernel中的mb(),表示這個操作之前的所有內存操作不會被重排到這個操作之后),所以可以忽略掉這個參數。

下面簡單介紹一下__sync_fetch_and_add反匯編出來的指令(實際上,這部分我還不是很懂,都是從其他博客上摘錄的)
804889d:f0 83 05 50 a0 04 08 lock addl $0x1,0x804a050
可以看到,addl前面有一個lock,這行匯編指令前面是f0開頭,f0叫做指令前綴,Richard Blum。lock前綴的意思是對內存區域的排他性訪問。

其實,lock是鎖FSB,前端串行總線,Front Serial Bus,這個FSB是處理器和RAM之間的總線,鎖住FSB,就能阻止其他處理器或者Core從RAM獲取數據。當然這種操作開銷相當大,只能操作小的內存可以這樣做,想想我們有memcpy,如果操作一大片內存,鎖內存,那么代價太大了。所以前面介紹__sync_fetch_and_add等函數,type只能是int, long, long long以及對應的unsigned類型。

此外,還有兩個類似的原子操作,
bool __sync_bool_compare_and_swap(type *ptr, type oldval, type newval, ...)
type __sync_val_compare_and_swap(type *ptr, type oldval, type newval, ...)

這兩個函數提供原子的比較和交換,如果*ptr == oldval,就將newval寫入*ptr,
第一個函數在相等並寫入的情況下返回true;
第二個函數在返回操作之前的值。
 
type __sync_lock_test_and_set(type *ptr, type value, ...)
將*ptr設為value並返回*ptr操作之前的值;
void __sync_lock_release(type *ptr, ...)
將*ptr置為0
 
有了這些寶貝函數,對於多線程對全局變量進行操作(自加、自減等)問題,我們就不用考慮線程鎖,可以考慮使用上述函數代替,和使用pthread_mutex保護的作用是一樣的,線程安全且性能上完爆線程鎖。
 
下面是對線程鎖和原子操作使用對比,並且進行性能測試與對比。代碼來自於文獻【1】,弄懂后並稍微改動一點點。代碼中分別給出加鎖、加線程鎖、原子計數操作三種情況的比較。
[cpp]  view plain  copy
 
  1. #include <stdio.h>  
  2. #include <stdlib.h>  
  3. #include <unistd.h>  
  4. #include <errno.h>  
  5. #include <pthread.h>  
  6. #include <sched.h>  
  7. #include <linux/unistd.h>  
  8. #include <sys/syscall.h>  
  9. #include <linux/types.h>  
  10. #include <time.h>  
  11. #include <sys/time.h>  
  12.   
  13. #define INC_TO 1000000 // one million  
  14.   
  15. __u64 rdtsc ()  
  16. {  
  17.     __u32 lo, hi;  
  18.     __asm__ __volatile__  
  19.     (  
  20.        "rdtsc":"=a"(lo),"=d"(hi)  
  21.     );  
  22.   
  23.     return (__u64)hi << 32 | lo;  
  24. }  
  25.   
  26. int global_int = 0;  
  27.   
  28. pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;//初始化互斥鎖  
  29.   
  30. pid_t gettid ()  
  31. {  
  32.     return syscall(__NR_gettid);  
  33. }  
  34.   
  35. void * thread_routine1 (void *arg)  
  36. {  
  37.     int i;  
  38.     int proc_num = (int)(long)arg;  
  39.       
  40.     __u64 begin, end;  
  41.     struct timeval tv_begin, tv_end;  
  42.     __u64 time_interval;  
  43.       
  44.     cpu_set_t set;  
  45.       
  46.     CPU_ZERO(&set);  
  47.     CPU_SET(proc_num, &set);  
  48.   
  49.     if (sched_setaffinity(gettid(), sizeof(cpu_set_t), &set))  
  50.     {  
  51.         fprintf(stderr, "failed to set affinity\n");  
  52.         return NULL;  
  53.     }  
  54.     begin = rdtsc();  
  55.     gettimeofday(&tv_begin, NULL);  
  56.     for (i = 0; i < INC_TO; i++)  
  57.     {  
  58.         __sync_fetch_and_add(&global_int, 1);  
  59.     }  
  60.     gettimeofday(&tv_end, NULL);  
  61.     end = rdtsc();  
  62.     time_interval = (tv_end.tv_sec - tv_begin.tv_sec) * 1000000 + (tv_end.tv_usec - tv_begin.tv_usec);  
  63.     fprintf(stderr, "proc_num : %d, __sync_fetch_and_add cost %llu CPU cycle, cost %llu us\n", proc_num, end - begin, time_interval);  
  64.       
  65.     return NULL;  
  66. }  
  67.   
  68. void *thread_routine2(void *arg)  
  69. {  
  70.     int i;  
  71.     int proc_num = (int)(long)arg;  
  72.   
  73.     __u64 begin, end;  
  74.     struct timeval tv_begin, tv_end;  
  75.     __u64 time_interval;  
  76.       
  77.     cpu_set_t set;  
  78.       
  79.     CPU_ZERO(&set);  
  80.     CPU_SET(proc_num, &set);  
  81.   
  82.     if (sched_setaffinity(gettid(), sizeof(cpu_set_t), &set))  
  83.     {  
  84.         fprintf(stderr, "failed to set affinity\n");  
  85.         return NULL;  
  86.     }  
  87.     begin = rdtsc();  
  88.     gettimeofday(&tv_begin, NULL);  
  89.     for (i = 0; i < INC_TO; i++)  
  90.     {  
  91.         pthread_mutex_lock(&count_lock);  
  92.         global_int++;  
  93.         pthread_mutex_unlock(&count_lock);  
  94.     }  
  95.     gettimeofday(&tv_end, NULL);  
  96.     end = rdtsc();  
  97.     time_interval = (tv_end.tv_sec - tv_begin.tv_sec) * 1000000 + (tv_end.tv_usec - tv_begin.tv_usec);  
  98.     fprintf(stderr, "proc_num : %d, pthread_mutex_lock cost %llu CPU cycle, cost %llu us\n", proc_num, end - begin, time_interval);  
  99.       
  100.     return NULL;    
  101. }  
  102.   
  103. void *thread_routine3(void *arg)  
  104. {  
  105.     int i;  
  106.     int proc_num = (int)(long)arg;  
  107.   
  108.     __u64 begin, end;  
  109.     struct timeval tv_begin, tv_end;  
  110.     __u64 time_interval;  
  111.       
  112.     cpu_set_t set;  
  113.       
  114.     CPU_ZERO(&set);  
  115.     CPU_SET(proc_num, &set);  
  116.   
  117.     if (sched_setaffinity(gettid(), sizeof(cpu_set_t), &set))  
  118.     {  
  119.         fprintf(stderr, "failed to set affinity\n");  
  120.         return NULL;  
  121.     }  
  122.     begin = rdtsc();  
  123.     gettimeofday(&tv_begin, NULL);  
  124.     for (i = 0; i < INC_TO; i++)  
  125.     {  
  126.         global_int++;  
  127.     }  
  128.     gettimeofday(&tv_end, NULL);  
  129.     end = rdtsc();  
  130.     time_interval = (tv_end.tv_sec - tv_begin.tv_sec) * 1000000 + (tv_end.tv_usec - tv_begin.tv_usec);  
  131.     fprintf(stderr, "proc_num : %d, no lock cost %llu CPU cycle, cost %llu us\n", proc_num, end - begin, time_interval);  
  132.       
  133.     return NULL;  
  134. }  
  135.   
  136. int main()  
  137. {  
  138.     int procs = 0;  
  139.     int all_cores = 0;  
  140.     int i;  
  141.     pthread_t *thrs;  
  142.   
  143.     procs = (int)sysconf(_SC_NPROCESSORS_ONLN);  
  144.     if (procs < 0)  
  145.     {  
  146.         fprintf(stderr, "failed to fetch available CPUs(Cores)\n");  
  147.         return -1;  
  148.     }  
  149.     all_cores = (int)sysconf(_SC_NPROCESSORS_CONF);  
  150.     if (all_cores < 0)  
  151.     {  
  152.         fprintf(stderr, "failed to fetch system configure CPUs(Cores)\n");  
  153.         return -1;  
  154.     }  
  155.       
  156.     printf("system configure CPUs(Cores): %d\n", all_cores);  
  157.     printf("system available CPUs(Cores): %d\n", procs);  
  158.   
  159.     thrs = (pthread_t *)malloc(sizeof(pthread_t) * procs);  
  160.     if (thrs == NULL)  
  161.     {  
  162.         fprintf(stderr, "failed to malloc pthread array\n");  
  163.         return -1;  
  164.     }  
  165.       
  166.     printf("starting %d threads...\n", procs);  
  167.       
  168.     for (i = 0; i < procs; i++)  
  169.     {  
  170.         if (pthread_create(&thrs[i], NULL, thread_routine1, (void *)(long) i))  
  171.         {  
  172.             fprintf(stderr, "failed to pthread create\n");  
  173.             procs = i;  
  174.             break;  
  175.         }  
  176.     }  
  177.   
  178.     for (i = 0; i < procs; i++)  
  179.     {  
  180.         pthread_join(thrs[i], NULL);  
  181.     }  
  182.     
  183.     printf("after doing all the math, global_int value is: %d\n", global_int);  
  184.     printf("expected value is: %d\n", INC_TO * procs);  
  185.   
  186.     free (thrs);  
  187.       
  188.     return 0;  
  189. }  
 
         運行結果如下:
         每次修改不同thread_routine?()函數,重新編譯即可測試不同情況。
         g++ main.cpp -D _GNU_SOURCE -l pthread
         ./a.out
 
         不加鎖下運行結果:
[plain]  view plain  copy
 
  1. system configure CPUs(Cores): 8  
  2. system available CPUs(Cores): 8  
  3. starting 8 threads...  
  4. proc_num : 5, no lock cost 158839371 CPU cycle, cost 66253 us  
  5. proc_num : 6, no lock cost 163866879 CPU cycle, cost 68351 us  
  6. proc_num : 2, no lock cost 173866203 CPU cycle, cost 72521 us  
  7. proc_num : 7, no lock cost 181006344 CPU cycle, cost 75500 us  
  8. proc_num : 1, no lock cost 186387174 CPU cycle, cost 77728 us  
  9. proc_num : 0, no lock cost 186698304 CPU cycle, cost 77874 us  
  10. proc_num : 3, no lock cost 196089462 CPU cycle, cost 81790 us  
  11. proc_num : 4, no lock cost 200366793 CPU cycle, cost 83576 us  
  12. after doing all the math, global_int value is: 1743884  
  13. expected value is: 8000000  

          線程鎖下運行結果:
[plain]  view plain  copy
 
  1. system configure CPUs(Cores): 8  
  2. system available CPUs(Cores): 8  
  3. starting 8 threads...  
  4. proc_num : 1, pthread_mutex_lock cost 9752929875 CPU cycle, cost 4068121 us  
  5. proc_num : 5, pthread_mutex_lock cost 10038570354 CPU cycle, cost 4187272 us  
  6. proc_num : 7, pthread_mutex_lock cost 10041209091 CPU cycle, cost 4188374 us  
  7. proc_num : 0, pthread_mutex_lock cost 10044102546 CPU cycle, cost 4189546 us  
  8. proc_num : 6, pthread_mutex_lock cost 10113533973 CPU cycle, cost 4218541 us  
  9. proc_num : 4, pthread_mutex_lock cost 10117540197 CPU cycle, cost 4220212 us  
  10. proc_num : 3, pthread_mutex_lock cost 10160384391 CPU cycle, cost 4238083 us  
  11. proc_num : 2, pthread_mutex_lock cost 10164464784 CPU cycle, cost 4239778 us  
  12. after doing all the math, global_int value is: 8000000  
  13. expected value is: 8000000  

         原子操作__sync_fetch_and_add下運行結果:
[plain]  view plain  copy
 
  1. system configure CPUs(Cores): 8  
  2. system available CPUs(Cores): 8  
  3. starting 8 threads...  
  4. proc_num : 3, __sync_fetch_and_add cost 2364148575 CPU cycle, cost 986129 us  
  5. proc_num : 1, __sync_fetch_and_add cost 2374990974 CPU cycle, cost 990652 us  
  6. proc_num : 2, __sync_fetch_and_add cost 2457930267 CPU cycle, cost 1025247 us  
  7. proc_num : 5, __sync_fetch_and_add cost 2463027030 CPU cycle, cost 1027373 us  
  8. proc_num : 7, __sync_fetch_and_add cost 2532240981 CPU cycle, cost 1056244 us  
  9. proc_num : 4, __sync_fetch_and_add cost 2555055054 CPU cycle, cost 1065760 us  
  10. proc_num : 0, __sync_fetch_and_add cost 2561248971 CPU cycle, cost 1068331 us  
  11. proc_num : 6, __sync_fetch_and_add cost 2558781396 CPU cycle, cost 1067314 us  
  12. after doing all the math, global_int value is: 8000000  
  13. expected value is: 8000000  
通過測試結果可以看出:
 
        1. 不加鎖的情況下,不能獲得正確結果。

                測試結果表明,正確結果為8000000,而實際為1743884。表明多線程下修改全局計數,不加鎖的話是錯誤的;

        2. 加鎖情況下,無論是線程鎖還是原子性操作,均可獲得正確結果。

        3. 性能上__sync_fetch_and_add()完爆線程鎖。

                從性能測試結果上看,__sync_fetch_and_add()速度大致是線程鎖的4-5倍。

        

 

測試結果對比
類型 平均CPU周期(circle) 平均耗時(us)
不加鎖 180890066 75449.13
線程鎖 10054091901 4193740.875
原子操作 2483427906 1035881.25

 

 

注:如上的性能測試結果,表明__sync_fetch_and_add()速度大致是線程鎖的4-5倍,而並非文獻【1】中6-7倍。由此,懷疑可能是由不同機器、不同CPU導致的,上述測試是在一台8core的虛擬機上實驗的。為此,我又在不同的機器上重復相同的測試。

         24cores實體機測試結果,表明__sync_fetch_and_add()速度大致只有線程鎖的2-3倍。

 

24 cores實體機測試結果
類型 平均CPU周期(circle) 平均耗時(us)
不加鎖 535457026 233310.5
線程鎖 9331915480 4066156.667
原子操作 3769900795 1643463.625

 

       總體看來,原子操作__sync_fetch_and_add()大大的優於線程鎖。

 

另外:

       上面介紹的原子操作參數里都有可擴展參數(...)用來指出哪些變量需要memory barrier,因為目前gcc實現的是full barrier(類似Linux kernel中的mb(),表示這個操作之前的所有內存操作不會被重排到這個操作之后),所以可以忽略掉這個參數。下面是有關memory barrier的東西。

        關於memory barrier, cpu會對我們的指令進行排序,一般說來會提高程序的效率,但有時候可能造成我們不希望看到的結果。舉例說明,比如我們有一硬件設備,當你發出一個操作指令的時候,一個寄存器存的是你的操作指令(READ),兩個寄存器存的是參數(比如地址和size),最后一個寄存器是控制寄存器,在所有的參數都設置好后向其發出指令,設備開始讀取參數,執行命令,程序可能如下:
             write1(dev.register_size, size);
             write1(dev.register_addr, addr);
             write1(dev.register_cmd, READ);
             write1(dev.register_control, Go);

       如果CPU對我們的指令進行優化排序,導致最后一條write1被換到前幾條語句之前,那么肯定不是我們所期望的,這時候我們可以在最后一條語句之前加入一個memory barrier,強制CPU執行完前面的寫入后再執行最后一條:
             write1(dev.register_size, size);
             write1(dev.register_addr, addr);
             write1(dev.register_cmd, READ);
             __sync_synchronize();            發出一個full barrier
             write1(dev.register_control, GO);
 
memory barrier有幾種類型:
      acquire barrier:不允許將barrier之后的內存讀取指令移到barrier之前;(linux kernel中的wmb)
      release barrier:不允許將barrier之前的內存讀取指令移到barrier之后;(linux kernel中的rmb)
      full barrier:以上兩種barrier的合集;(linux kernel中的mb)
 
參考文獻:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM