鎖&鎖與指令原子操作的關系 & cas_Queue


  鎖以及信號量對大部分人來說都是非常熟悉的,特別是常用的mutex。鎖有很多種,互斥鎖,自旋鎖,讀寫鎖,順序鎖,等等,這里就只介紹常見到的,

    互斥鎖

      這個是最常用的,win32:CreateMutex-WaitForSingleObject-ReleaseMutex,linux的pthread_mutex_lock-pthread_mutex_unlock,c#的lock和Monitor,java的lock,這些都是互斥鎖。互斥鎖的作用大家都知道,是讓一段代碼同時只能有一個線程運行,

    自旋鎖

      不常用,linux的pthread_spin系列函數就是自旋鎖,(網上很多用原子操作寫的自旋鎖),作用和互斥鎖大同小異。

    信號量

      win下的CreateSemaphore、OpenSemaphore、ReleaseSemaphore、WaitForSingleObject,linux也有同樣的semaphore系列,還有c#的AutoResetEvent或者semaphore。這個用的也很多,信號兩個狀態,阻塞和通過,作用是保證多線程代碼的業務順序!

  先嘮一嘮這些鎖的原理,(為什么我把信號量也歸結於鎖?)

    首先互斥鎖,互斥鎖實際上是由原子操作來實現的,

    比如,當變量A為0的時候為非鎖,為1的時候為鎖,當第一個線程將變量A從0變為1(原子操作)成功的時候,就相當於獲取鎖成功了,另外的線程再次獲取鎖的時候發現A為1了,(或者說兩個線程同時獲取鎖->原子操作,某一個會失敗),表示獲取鎖失敗,當第一個線程用完了,就釋放鎖,將A=0(原子操作)。

    互斥鎖的特點是,當鎖獲取失敗了,當前代碼上下文(線程)會休眠,並且把當前線程添加到這個內核維護的互斥鎖的鏈表里,當后面的鎖再次獲取失敗,也是將當前線程和執行信息放到這個鏈表里。當前占用的互斥鎖的人用完了鎖,內核會抽取互斥鎖等待鏈表上的下一個線程開始喚醒繼續執行,當內核鏈表上為空,就是沒人搶鎖了,就將鎖狀態設置為非鎖,以次類推~

    然后呢,我們講自旋鎖,自旋鎖很簡單,他和互斥鎖大同小異,區別就是不休眠,當獲取鎖失敗了,就一直while(獲取),一直到成功,所以,自旋鎖在大部分場景都是不適用的,因為獲取鎖的時間里,cpu一直是100%的!!

    最后講信號量,上面問為什么我將信號量也歸結於鎖這一類?

    因為信號量也是原子操作來實現的!道理和互斥鎖一樣的信號量也有一個鏈表,當等待信號的時候,系統也是把當前線程休眠,把線程和代碼執行信息存儲到這個信號量的鏈表里,當內核接受到信號的時候,就把這個信號量上的所有等待線程激活運行,這就是信號量!

原子操作

    到底什么是原子操作?

    百度百科  所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。

    所以,原子操作保證了多個線程對內存操作某個值得准確性!那么原子操作具體如何實現的?

    首先是inter cpu,熟悉匯編的人都知道,inter指令集有個lock,如果某個指令集前面加個lock,那么在多核狀態下,某個核執行到這個前面加lock的指令的時候,inter會讓總線鎖住,當這個核把這個指令執行完了,再開啟總線!這是一種最最底層的鎖!!

    比如  lock cmpxchg dword ptr [rcx],edx  cmpxchg這個指令就被加鎖了!

    inter指令參考可查閱http://www.intel.cn/content/www/cn/zh/processors/architectures-software-developer-manuals.html

    來自IA-32券3:

    HLT 指令(停止處理器)停止處理器直至接收到一個啟用中斷(比如 NMI 或 SMI,正 常情況下這些都是開啟的)、調試異常、BINIT#信號、INIT#信號或 RESET#信號。處理 器產生一個特殊的總線周期以表明進入停止模式。 硬件對這個信號的響應有好幾個方面。前面板上的指示燈會打亮,產生一個記錄 診斷信息的 NMI 中斷,調用復位初始化過程(注意 BINIT#引腳是在 Pentium Pro 處理器 引入的)。如果停機過程中有非喚醒事件(比如 A20M#中斷)未處理,它們將在喚醒停 機事件處理之后的進行處理。

    在修改內存操作時,使用 LOCK 前綴去調用加鎖的讀-修改-寫操作(原子的)。這種 機制用於多處理器系統中處理器之間進行可靠的通訊,具體描述如下: 在 Pentium 和早期的 IA-32 處理器中,LOCK 前綴會使處理器執行當前指令時產生 一個 LOCK#信號,這總是引起顯式總線鎖定出現。 在 Pentium 4、Intel Xeon 和 P6 系列處理器中,加鎖操作是由高速緩存鎖或總線 鎖來處理。如果內存訪問有高速緩存且只影響一個單獨的高速緩存線,那么操作中就 會調用高速緩存鎖,而系統總線和系統內存中的實際內存區域不會被鎖定。同時,這 條總線上的其它 Pentium 4、Intel Xeon 或者 P6 系列處理器就回寫所有的已修改數據 並使它們的高速緩存失效,以保證系統內存的一致性。如果內存訪問沒有高速緩存且/ 或它跨越了高速緩存線的邊界,那么這個處理器就會產生 LOCK#信號,並在鎖定操作期 間不會響應總線控制請求。

    IA-32 處理器提供有一個 LOCK#信號,會在某些關鍵內存操作期間被自動激活,去鎖定系統總線。當這個輸出信號發出的時候,來自其它處理器或總線代理的總線控制請求將被阻塞。軟件能夠通過預先在指令前添加 LOCK 前綴來指定需要 LOCK 語義的其它場合。在 Intel386、Intel486、Pentium 處理器中,明確地對指令加鎖會導致 LOCK#信號的產生。由硬件設計人員來保證系統硬件中 LOCK#信號的可用性,以控制處理器間的內IA-32 架構軟件開發人員指南 卷 3:系統編程指南170存訪問。對於 Pentium 4、Intel Xeon 以及 P6 系列處理器,如果被訪問的內存區域是在處理器內部進行高速緩存的,那么通常不發出 LOCK#信號;相反,加鎖只應用於處理器的高速緩存(參見 7.1.4.LOCK 操作對處理器內部高速緩存的影響) 。

    可參考inter的 IA-32券3 第七章第一小節!

    當然inter還有其他方式保證原子操作!

    然后是ARM cpu, arm主要是靠兩個指令來保證原子操作的,LDREX 和 STREX

    LDREX
      LDREX 可從內存加載數據。

      如果物理地址有共享 TLB 屬性,則 LDREX 會將該物理地址標記為由當前處理器獨占訪問,並且會清除該處理器對其他任何物理地址的任何獨占訪問標記。

        否則,會標記:執行處理器已經標記了一個物理地址,但訪問尚未完畢。

    STREX
      STREX 可在一定條件下向內存存儲數據。 條件具體如下:

      如果物理地址沒有共享 TLB 屬性,且執行處理器有一個已標記但尚未訪問完畢的物理地址,那么將會進行存儲,清除該標記,並在Rd 中返回值 0。

      如果物理地址沒有共享 TLB 屬性,且執行處理器也沒有已標記但尚未訪問完畢的物理地址,那么將不會進行存儲,而會在Rd 中返回值 1。

      如果物理地址有共享 TLB 屬性,且已被標記為由執行處理器獨占訪問,那么將進行存儲,清除該標記,並在Rd 中返回值 0。

      如果物理地址有共享 TLB 屬性,但沒有標記為由執行處理器獨占訪問,那么不會進行存儲,且會在Rd 中返回值 1。

    參考:http://blog.csdn.net/duanlove/article/details/8212123

 

原子CAS操作

原子操作指令里,有原子加,原子減,cas到底是什么呢?

首先看一段代碼,

bool compare_and_swap(int *accum, int *dest, int newval)
{
  if (*accum == *dest) {
      *dest = newval;
      return true;
  } else {
      *accum = *dest;
      return false;
  }
}

 

  cas即是Compare-and-swap,先比較再互換,即修改,意思就是,當reg等oldvalue的時候,將reg設置為newval,這段代碼在非原子情況下(多線程)是沒用的,但是如果這段代碼是原子操作,那么他的威力就非常大, 互斥鎖就和這個cas有關,

  上面我們也看到inter這個指令了,lock cmpxchgcmpxchg作用就是cas這個函數的作用比較並交換操作數,這就是cas原子操作,神奇吧,上面一個函數的作用,被inter一個指令搞定了,再cmpxchg前面加一個lock,那么這就是一個真正發揮威力的cas!

    在win32內核中有個InterlockedCompareExchange函數,這個函數就是cas功能,在inter cpu上的實現就是這段指令=》lock cmpxchg!

    linux下有__sync_bool_compare_and_swap 和 __sync_val_compare_and_swap 。

     在dotnet下有 interlocked.compareexchange。java參考sun.misc.Unsafe類。

CAS操作,到底有什么威力?

    如果要修改一個變量,在多線程下,應該要加鎖,代碼是這樣的

int num = 0;
void add()
{
	lock();
	num = num + 123;
	unlock();
}

 

    但是如果不要鎖,cas來操作??

int num = 0;
void add()
{
	int temp;
	do
	{
		temp = num;
	} 
	while (cas(num, temp, temp+123)==true)
}

  我們看到用一個do while來無限判斷cas的修改結果,如果修改完成,那就成功+1,如果cas沒有修改成功,繼續while,temp將獲取最新的num,再次cas操作!

  當一個線程的時候,num一個人操作,不會出現差錯,當兩個人的時候,某個人先進行cas原子操作,num+1,第二個線程拿着舊值去加操作,返現返回的就是false,於是重新復制temp獲取最新的num,這就是cas的核心價值!無鎖!

  cas其實這也算一種鎖,樂觀鎖!相同於自旋鎖也循環!

   貼下cas互斥鎖的代碼(自己寫的),當然也可以去用原子+-來判斷,反正都是原子操作~~

int i = 0;//0非鎖,1鎖住
//嘗試獲取鎖,當cas返回失敗,獲取鎖失敗,返回true,獲取鎖成功 獲取失敗就休眠,等待系統喚醒
bool lock()
{
	return cas(i, 0, 1);
}
bool unlock()
{
	return cas(i, 1, 0);
}

 

CAS無鎖Queue

    簡單發下我寫的cas環形隊列,很簡單的!

// .h

#pragma once

#ifndef _cas_queue
#define _cas_queue

#ifndef C_BOOL
#define C_BOOL

typedef int cbool;
#define false 0  
#define true  1

#endif

//
//typedef struct _cas_queue
//{
//	int size;
//} cas_queue;

#define QUEUE_SIZE 65536



#ifdef __cplusplus
extern "C" {
#endif
/*
compare and swap: CAS(*ptr,outvalue,newvalue);
return bool
*/

	cbool compare_and_swap(void ** ptr,long outvalue,long newvalue);

	void cas_queue_init(int queue_size);

	void cas_queue_free();

	cbool  cas_queue_try_enqueue(void * p);

	cbool cas_queue_try_dequeue(void ** p);


#ifdef __cplusplus
}
#endif

#endif


//.c
#include "cas_queue.h"

#ifdef _MSC_VER
#include <windows.h>
#else

#endif

volatile unsigned long read_index = 0;
volatile unsigned long write_index = 0;

long* read_index_p = &read_index;
long* write_index_p = &write_index;

void** ring_queue_buffer_head;

int ring_queue_size = QUEUE_SIZE;

cbool is_load = 0;

cbool compare_and_swap(void * ptr, long outvalue, long newvalue)
{
#ifdef _MSC_VER  // vs
	long return_outvalue = InterlockedCompareExchange(ptr, newvalue, outvalue);
	return return_outvalue == outvalue;
	/*InterlockedCompareExchange64 No success!!*/
	//#ifndef _WIN64 
	//	//32 bit
	//	long return_outvalue = InterlockedCompareExchange(ptr, newvalue, outvalue);
	//	return return_outvalue == outvalue;
	//#else
	//	//64 bit
	//	long return_outvalue = InterlockedCompareExchange64(ptr, newvalue, outvalue);
	//	return return_outvalue == outvalue;
	//#endif
#else
	//linux
#endif

}

void cas_queue_init(int queue_size)
{
	if (queue_size > 0)
		ring_queue_size = queue_size;
	int size = sizeof(void**)*ring_queue_size;
	ring_queue_buffer_head = malloc(size);
	memset(ring_queue_buffer_head, 0, size);
	is_load = 1;
	read_index = 0;
	write_index = 0;
}

void cas_queue_free()
{
	is_load = 0;
	free(ring_queue_buffer_head);
}

cbool cas_queue_try_enqueue(void * p)
{
	if (!is_load)
		return false;
	unsigned long index;
	do
	{
		//queue full
		if (read_index != write_index && read_index%ring_queue_size == write_index%ring_queue_size)
			return false;
		index = write_index;
	} while (compare_and_swap(&write_index, index, index + 1) != true);
	ring_queue_buffer_head[index%ring_queue_size] = p;

	return true;
}
cbool cas_queue_try_dequeue(void ** p)
{
	if (!is_load)
		return false;
	unsigned long index;
	do
	{
		//queue empty
		if (read_index == write_index)
			return false;
		index = read_index;
	} while (compare_and_swap(read_index_p, index, index + 1) != true);
	*p = ring_queue_buffer_head[index%ring_queue_size];
	return true;
}

    具體我測試過,在4個線程情況下,80萬個消息,同時入和出,出完只需要150毫秒左右!當然線程過多而且集火的話肯定會慢的。

這個demo不是很實用,看下篇:CAS原子鎖 高效自旋無鎖的正確用法

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM