本文分析的原代碼版本: 2.6.24.4
kfifo的定義文件: kernel/kfifo.c
kfifo的頭文件: include/linux/kfifo.h
kfifo是內核里面的一個First In First Out數據結構,它采用環形循環隊列的數據結構來實現,提供一個無邊界的字節流服務,並且使用並行無鎖編程技術,即當它用於只有一個入隊線程和一個出隊線程的場情時,兩個線程可以並發操作,而不需要任何加鎖行為,就可以保證kfifo的線程安全。
下文着重於代碼剖析,各部分代碼后面有關鍵點說明,同時可參考注釋進行理解:
struct kfifo { unsigned char *buffer; /* the buffer holding the data : 用於存放數據的緩存 */ unsigned int size; /* the size of the allocated buffer : 空間的大小,在初化時將它向上擴展成2的冪,為了高效的進行與操作取余,后面會詳解 */ unsigned int in; /* data is added at offset (in % size) : 如果使用不能保證任何時間最多只有一個讀線程和寫線程,需要使用該lock實施同步*/ unsigned int out; /* data is extracted from off. (out % size) :一起構成一個循環隊列。 in指向buffer中隊頭,而且out指向buffer中的隊尾 */ spinlock_t *lock; /* protects concurrent modifications : 用於put和get過程中加鎖防止並發*/ };
以上是kfifo的數據結構,kfifo主要提供了如下操作:
//根據給定buffer創建一個kfifo struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock);
//給定size分配buffer和kfifo struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock); //釋放kfifo空間 void kfifo_free(struct kfifo *fifo);
//向kfifo中添加數據 unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len); //從kfifo中取數據 unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len);
//獲取kfifo中有數據的buffer大小 unsigned int kfifo_len(struct kfifo *fifo);
(1)初始化部分:
/* 創建隊列 */
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { struct kfifo *fifo; /* size must be a power of 2 :判斷是否為2的冪*/ BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo; }
/* 分配空間 */
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { unsigned char *buffer; struct kfifo *ret; if (!is_power_of_2(size)) { /* 判斷是否為2的冪 */ BUG_ON(size > 0x80000000); size = roundup_pow_of_two(size); /* 如果不是則向上擴展成2的冪 */ } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret; }
巧妙點①:保證buffer size為2的冪
通常循環隊列入隊和出隊操作要不斷的對size 進行求余,一般采用 mInOffset % size(其他類似) 的方法,但是乘、除和求余等會執行多次加法器運算,它們沒有單純的加法運算效率高,更沒有位運算效率高。
所以kfifo->size的值總是在調用者傳進來的size參數的基礎上向2的冪擴展,目的是使 kfifo->size 取模運算可以轉化為與運算(提高運行效率):kfifo->in % kfifo->size 可以轉化為 kfifo->in & (kfifo->size – 1);
例如 size 為 16,求 3 % size 通常的做法是 3 % size = 3 , 現在變為 3 & (size - 1) = 3 size - 1 = 16 - 1 = 10000 - 1 = 01111 , 3 = 00011 , 3 & (size - 1) = 00011 & 01111 = 00011 = 3.
再看其他幾個例子 :
5 & (size - 1) = 00101 & 01111 = 00101 = 5
8 & (size - 1) = 01000 & 01111 = 01000 = 8
15 & (size -1) = 01111 & 01111 = 01111 = 15
16 & (size - 1) = 10000 & 01111 = 00000 = 0
26 & (size - 1 ) = 11010 & 01111 = 01010 = 10 (26 % 16 = 10)
所以保證size是2的冪的前提下可以通過位運算的方式求余,在頻繁操作隊列的情況下可以大大提高效率。
(2)入隊、出隊操作:
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_put(fifo, buffer, len); spin_unlock_irqrestore(fifo->lock, flags); return ret; } static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_get(fifo, buffer, len); //當fifo->in == fifo->out時,buufer為空 if (fifo->in == fifo->out) fifo->in = fifo->out = 0; spin_unlock_irqrestore(fifo->lock, flags); return ret; } unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned int l; //buffer中空的長度 len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. */ smp_mb(); // 內存屏障:smp_mb(),smp_rmb(), smp_wmb(),來保證對方觀察到的內存操作順序 /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ smp_wmb(); fifo->in += len; //每次累加,到達最大值后溢出,自動轉為0 return len; } unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; //有數據的緩沖區的長度 len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ smp_mb(); fifo->out += len; //每次累加,到達最大值后溢出,自動轉為0 return len; }
巧妙點②:使用spin_lock_irqsave&spin_unlock_irqrestore 實現同步
Linux內核中通常有spin_lock、spin_lock_irq 和 spin_lock_irqsave,分別何時使用呢?
spin_lock的調用關系:
spin_lock | + -----> raw_spin_lock static inline void __raw_spin_lock(raw_spinlock_t *lock) { preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
spin_lock_irq的調用關系:
spin_lock_irq
|
+-------> raw_spin_lock_irq static inline void __raw_spin_lock_irq(raw_spinlock_t *lock) { local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
可以看出來他們兩者只有一個差別:是否調用local_irq_disable()函數, 即是否禁止本地中斷。在任何情況下使用spin_lock_irq都是安全的。因為它既禁止本地中斷,又禁止內核搶占。spin_lock比spin_lock_irq速度快,但是它並不是任何情況下都是安全的。
舉個例子:
進程A中調用了spin_lock(&lock) 然后進入臨界區,此時來了一個中斷(interrupt),該中斷也運行在和進程A相同的CPU上,並且在該中斷處理程序中恰巧也會spin_lock(&lock) 試圖獲取同一個鎖。由於是在同一個CPU上被中斷,進程A會被設置為TASK_INTERRUPT狀態,中斷處理程序無法獲得鎖,會不停的忙等,由於進程A被設置為中斷狀態,schedule()進程調度就無法再調度進程A運行,這樣就導致了死鎖!
但是如果該中斷處理程序運行在不同的CPU上就不會觸發死鎖。 因為在不同的CPU上出現中斷不會導致進程A的狀態被設為TASK_INTERRUPT,當中斷處理程序忙等被換出后,進程A還是有機會獲得CPU,執行並退出臨界區。所以在使用spin_lock時要明確知道該鎖不會在中斷處理程序中使用。
而spin_lock_irqsave是基於spin_lock_irq實現的一個便利接口,在於你不期望在離開臨界區后,改變中斷的開啟/關閉狀態!進入臨界區是關閉的,離開后它同樣應該是關閉的!
如果自旋鎖在中斷處理函數中被用到,那么在獲取該鎖之前需要關閉本地中斷,spin_lock_irqsave 實現下列動作:
① 保存本地中斷狀態;
② 關閉本地中斷;
③ 獲取自旋鎖。
解鎖時通過 spin_unlock_irqrestore完成釋放鎖、恢復本地中斷到之前的狀態等工作。
巧妙點③:代碼為線性結構
代碼中沒有任何if-else分支來判斷是否有足夠的空間存放數據,kfifo每次入隊或出隊只是簡單的 +len 判斷剩余空間,並沒有對kfifo->size 進行取模運算,所以kfifo->in和kfifo->out總是一直增大,直到unsigned in超過最大值時繞回到0這一起始端,但始終滿足:kfifo->in - kfifo->out <= kfifo->size。
對於給定的kfifo空間大小: kfifo->size 數據空間長度為:kfifo->in - kfifo->out 而剩余空間長度為:kfifo->size - (kfifo->in - kfifo->out)
以 __kfifo_get為例(__kfifo_get類似):
len = min(len, fifo->size - fifo->in + fifo->out); // 此處用min宏代替if-else判斷剩余空間,len賦值為實際要寫入的數據大小 ...... /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); // l = 准備寫入數據的大小和 當前fifo->in在隊列中的游標位置到隊尾之間長度的最小值,其中(fifo->in & (fifo->size - 1) 等價於 fifo->in%fifo->size(注:fifo->size為2的冪) memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); // 如果要寫入的數據長度大於當前fifo->in游標到隊尾的距離,則先拷貝一部分數據填滿至隊尾 /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); // 再從隊頭寫入剩余的數據
巧妙點④:使用內存屏障(Memory Barriers):smp_mb(),smp_rmb(), smp_wmb()
- 內存屏障API函數說明:
mb() 適用於多處理器和單處理器的內存屏障。
rmb() 適用於多處理器和單處理器的讀內存屏障。
wmb() 適用於多處理器和單處理器的寫內存屏障。
smp_mb() 適用於多處理器的內存屏障。
smp_rmb() 適用於多處理器的讀內存屏障。
smp_wmb() 適用於多處理器的寫內存屏障。
- Memory barrier 常用場合包括:
1.實現同步原語(synchronization primitives) 2.實現無鎖數據結構(lock-free data structures) 3.驅動程序
程序在運行時內存實際的訪問順序和程序代碼編寫的訪問順序不一定一致,這就是內存亂序訪問。內存亂序訪問行為出現的理由是為了提升程序運行時的性能。內存亂序訪問主要發生在兩個階段:
1.編譯時,編譯器優化導致內存亂序訪問(指令重排) 2.運行時,多 CPU 間交互引起內存亂序訪問
Memory barrier 能夠讓 CPU 或編譯器在內存訪問上有序。一個 Memory barrier 之前的內存訪問操作必定先於其之后的完成。Memory barrier 包括兩類:
1.編譯器 barrier 2.CPU Memory barrier
很多時候,編譯器和 CPU 引起內存亂序訪問不會帶來什么問題,但一些特殊情況下,程序邏輯的正確性依賴於內存訪問順序,這時候內存亂序訪問會帶來邏輯上的錯誤。
(1)編譯時內存亂序訪問:在編譯時,編譯器對代碼做出優化時可能改變實際執行指令的順序(例如 gcc 下 O2 或 O3 都會改變實際執行指令的順序):
// test.cpp int x, y, r; void f() { x = r; y = 1; }
編譯器優化的結果可能導致 y = 1 在 x = r 之前執行完成。
(2)運行時內存亂序訪問:在運行時,CPU 雖然會亂序執行指令,但是在單個 CPU 的上,硬件能夠保證程序執行時所有的內存訪問操作看起來像是按程序代碼編寫的順序執行的,這時候 Memory barrier 沒有必要使用(不考慮編譯器優化的情況下)。CPU執行指令分:取址、譯碼等等,為了更快的執行指令,CPU采取了流水線的執行方式,編譯器在編譯代碼時為了使指令更適合CPU的流水線執行方式以及多CPU執行,原本的指令就會出現亂序的情況。在亂序執行時,一個處理器真正執行指令的順序由可用的輸入數據決定,而非程序員編寫的順序。
// thread 1 while (!ok); do(x); // thread 2 x = 42; ok = 1;
此段代碼中,ok 初始化為 0,線程 1 等待 ok 被設置為 1 后執行 do 函數。假如線程 2 對內存的寫操作亂序執行,也就是 x 賦值后於 ok 賦值完成,那么 do 函數接受的實參就很可能出乎程序員的意料,不為 42!
-end-