轉自:https://blog.csdn.net/ezimu/article/details/54851148
概述:
等待隊列、工作隊列、Tasklet都是linux驅動很重要的API,下面主要從用法上來講述如何使用API.
應用場景:
-
等待隊列(waitqueue)
linux驅動中,阻塞一般就是用等待隊列來實現,將進程停止在此處並睡眠下,直到條件滿足時,才可通過此處,繼續運行。在睡眠等待期間,wake up時,喚起來檢查條件,條件滿足解除阻塞,不滿足繼續睡下去。 -
工作隊列(workqueue)
工作隊列,將一個work提交到workqueue上,而這個workqueue是掛到一個特殊內核進程上,當這個特殊內核進程被調度時,會從workqueue上取出work來執行。當然這里的work是與函數聯系起來的。這個過程表現為,此刻先接下work,但不立刻執行這個work,等有時間再執行,而這個時間是不確定的。
工作隊列運行在進程上下文,可以睡眠。 -
Tasklet
Tasklet,同樣,也是先接下任務,但不立刻做任務,與work很類似。tasklet運行在軟中斷上下文。軟中斷:有這樣三句話理解”硬中斷是外部設備對CPU的中斷”,”軟中斷通常是硬中斷服務程序對內核的中斷”,”信號則是由內核(或其他進程)對某個進程的中斷”
這三句話,是比較籠統的理解,現在回到linux具體來理解:
-
軟中斷觸發時機:
(1)中斷上下文觸發(在中斷服務程序中),在中斷服務程序退出后,軟中斷會得到立馬處理。
(2)非中斷上下文(也可以理解進程上下文),通過喚醒守護進程ksoftirqd,只有當守護進程得到調度后,軟中斷才會得到處理。
不管是中斷上下文,還是非中斷上下文,最終都是調用__do_softirq實現的軟中斷,在這個函數里面是打開硬件中斷,關閉內核搶占。這就是軟中斷上下文,即開硬件中斷,關閉搶占。 -
tasklet是基於軟中斷實現的,用在中斷服務程序觸發tasklet,則就是中斷下半部分,也是用得最多的情況。用在進程上下文觸發tasklet,則很類似workqueue,但是tasklet不能有睡眠(因為關閉搶占的,不考慮硬件中斷,就是原子性的),也不適合做非常耗時的,如果是非常耗時的,盡量交給workqueue(可以在tasklet回調里面用work,把更耗時,時間要求更不高的,交給workqueue)。
-
軟中斷詳細了解,可參考如下博文:
linux軟中斷機制分析
linux中斷底半部之 softirq 原理與代碼分析
linux軟中斷與硬中斷實現原理概述
硬中斷、軟中斷和信號
等待隊列(waitqueue)
- 定義頭文件:
#include <linux/wait.h>
- 定義和初始化等待隊列頭(workqueue):
靜態的,用宏:
#define DECLARE_WAIT_QUEUE_HEAD(name) \ wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
動態的,也是用宏:
#define init_waitqueue_head(q) \ do { \ static struct lock_class_key __key; \ \ __init_waitqueue_head((q), #q, &__key); \ } while (0)
如:
wait_queue_head_t wq;
init_waitqueue_head(&wq);
- 阻塞接口:
都是些宏:
wait_event(wq, condition)
wait_event_timeout(wq, condition, timeout)
wait_event_interruptible(wq, condition)
wait_event_interruptible_timeout(wq, condition, timeout)
wait_event_hrtimeout(wq, condition, timeout)
wait_event_interruptible_hrtimeout(wq, condition, timeout)
wait_event_interruptible_exclusive(wq, condition)
wait_event_interruptible_locked(wq, condition)
wait_event_interruptible_locked_irq(wq, condition) wait_event_interruptible_exclusive_locked(wq, condition) wait_event_interruptible_exclusive_locked_irq(wq, condition) wait_event_killable(wq, condition) wait_event_lock_irq_cmd(wq, condition, lock, cmd) wait_event_lock_irq(wq, condition, lock) wait_event_interruptible_lock_irq_cmd(wq, condition, lock, cmd) wait_event_interruptible_lock_irq(wq, condition, lock) wait_event_interruptible_lock_irq_timeout(wq, condition, lock, timeout)
接口版本比較多,各自都有自己合適的應用場合,但是常用的是前面四個。
其中wq是我們定義的等待隊列頭,condition為條件表達式,當wake up后,condition為真時,喚醒阻塞的進程,為假時,繼續睡眠。
wait_event:不可中斷的睡眠,條件一直不滿足,會一直睡眠。
wait_event_timeout:不可中斷睡眠,當超過指定的timeout(單位是jiffies)時間,不管有沒有wake up,還是條件沒滿足,都要喚醒進程,此時返回的是0。在timeout時間內條件滿足返回值為timeout或者1;
wait_event_interruptible:可被信號中斷的睡眠,被信號打斷喚醒時,返回負值-ERESTARTSYS;wake up時,條件滿足的,返回0。除了wait_event沒有返回值,其它的都有返回,有返回值的一般都要判斷返回值。如下例:
int flag = 0; if(wait_event_interruptible(&wq,flag == 1)) return -ERESTARTSYS;
wait_event_interruptible_timeout:是wait_event_timeout和wait_event_interruptible_timeout的結合版本,有它們兩個的特點。
其他的接口,用的不多,有興趣可以自己看看。
- 解除阻塞接口(喚醒)
接口也是些宏:
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) #define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL) #define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL, 1) #define wake_up_all_locked(x) __wake_up_locked((x), TASK_NORMAL, 0) #define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) #define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1)
wake_up:一次只能喚醒掛在這個等待隊列頭上的一個進程
wake_up_nr:一次喚起nr個進程(等待在同一個wait_queue_head_t有很多個)
wake_up_all:一次喚起所有等待在同一個wait_queue_head_t上所有進程
wake_up_interruptible:對應wait_event_interruptible版本的wake up
wake_up_interruptible_sync:保證wake up的動作原子性,wake_up這個函數,很有可能函數還沒執行完,就被喚起來進程給搶占了,這個函數能夠保證wak up動作完整的執行完成。
其他的也是與對應阻塞接口對應的。
- 靈活的添加刪除等待隊列頭中的等待隊列:
這小節,可以不看,對應用,不是很重要。
(1)定義:
靜態:
#define DECLARE_WAITQUEUE(name, tsk) \ wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
(2)動態:
wait_queue_t wa;
init_waitqueue_entry(&wa,&tsk);
tsk是進程結構體,一般是current(linux當前進程就是用這個獲取)。還可以用下面的,設置自定義的等待隊列回調函數,上面的是linux默認的一個回調函數default_wake_function(),不過默認的用得最多:
wait_queue_t wa;
wa->private = &tsk; int func(wait_queue_t *wait, unsigned mode, int flags, void *key) { // } init_waitqueue_func_entry(&wa,func);
(回調有什么作用?)
用下面函數將等待隊列,加入到等待隊列頭(帶remove的是從工作隊列頭中刪除工作隊列):
extern void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait); extern void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait); extern void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);
上面的阻塞和解除阻塞接口,只能是對當前進程阻塞/解除阻塞,有了這幾個靈活的接口,我們可以單獨定義一個等待隊列,只要獲取進程task_struct指針,我們可以將任何進程加入到這個等待隊列,然后加入到等待隊列頭,我們能將其它任何進程(不僅僅是當前進程),掛起睡眠,當然喚醒時,如果用wake_up_all版本的話,也會一同喚起。這種情況,阻塞不能用上面的接口了,我們需要用下一節講述的接口(schedule()),解除阻塞可以用wake_up,wake_up_interruptible等。
- 更高級靈活的阻塞:
阻塞當前進程的原理:用函數set_current_state()修改當前進程為TASK_INTERRUPTIBLE(不可中斷睡眠)或TASK_UNINTERRUPTIBLE(可中斷睡眠)狀態,然后調用schedule()告訴內核重新調度,由於當前進程狀態已經為睡眠狀態,自然就不會被調度。schedule()簡單說就是告訴內核當前進程主動放棄CPU控制權。這樣來,就可以說當前進程在此處睡眠,即阻塞在這里。
在上一小節“靈活的添加刪等待隊列頭中的等待隊列”,將任意進程加入到waitqueue,然后類似用:
task_struct *tsk; wait_queue_t wa; //假設tsk已經指向某進程控制塊 p->state = TASK_INTERRUPTIBLE;//or TASK_UNINTERRUPTIBLE init_waitqueue_entry(&wa,&tsk);
就能將任意進程掛起,當然,還需要將wa,掛到等待隊列頭,然后用wait_event(&wa),進程就會從就緒隊列中退出,進入到睡眠隊列,直到wake up時,被掛起的進程狀態被修改為TASK_RUNNING,才會被再次調度。(主要是schedule()下面會說到)。
先看下wait_event實現:
#define __wait_event(wq, condition) \ do { \ DEFINE_WAIT(__wait); \ \ for (;;) { \ prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \ if (condition) \ break; \ schedule(); \ } \ finish_wait(&wq, &__wait); \ } while (0) #define wait_event(wq, condition) \ do { \ if (condition) \ break; \ __wait_event(wq, condition); \ } while (0)
prepare_to_wait:
定義:void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
功能:將工作隊列wait加入到工作隊列頭q,並將當前進程設置為state指定的狀態,一般是TASK_UNINTERRUPTIBLE或TASK_INTERRUPTIBLE狀態(在這函數里有調用set_current_state)。
第一個參數:工作隊列頭
第二個參數:工作隊列
第三個參數:當前進程要設置的狀態
DEFINE_WAIT:定義一個工作隊列。
finish_wait:用了prepare_to_wait之后,當退出時,一定要用這個函數清空等待隊列。
從這個宏的實現,可以看出睡眠進程過程,prepare_to_wait先修改進程到睡眠狀態,條件不滿足,schedule()就放棄CPU控制權,睡眠,當wake up的時候,阻塞在wq(也可以說阻塞在wait_event處)等待隊列頭上的進程,再次得到運行,接着執行schedule()后面的代碼,這里,顯然是個循環,prepare_to_wait再次設置當前進程為睡眠狀態,然后判斷條件是否滿足,滿足就退出循環,finish_wait將當前進程恢復到TASK_RUNNING狀態,也就意味着阻塞解除。不滿足,繼續睡下去。如此反復等待條件成立。
明白這個過程,用prepare_to_wait和schedule()來實現更為靈活的阻塞,就很簡單了,解除阻塞和前面的一樣用wake_up,wake_up_interruptible等。
wait_queue_t成員flage重要的標志WQ_FLAG_EXCLUSIVE,表示:
- 當一個等待隊列入口有 WQ_FLAG_EXCLUSEVE 標志置位, 它被添加到等待隊列的尾
部. 沒有這個標志的入口項, 添加到開始. - 當 wake_up 被在一個等待隊列上調用, 它在喚醒第一個有 WQ_FLAG_EXCLUSIVE 標
志的進程后停止.
wait_event默認總是將waitqueue加入開始,而wake_up時總是一個一個的從開始處喚醒,如果不斷有waitqueue加入,那么最開始加入的,就一直得不到喚醒,有這個標志,就避免了這種情況。
prepare_to_wait_exclusive()就是加入了這個標志的。
工作隊列:
- 頭文件:
#include <linux/workqueue.h>
- 創建workqueue:
#define create_workqueue(name) \ alloc_workqueue((name), WQ_MEM_RECLAIM, 1) #define create_singlethread_workqueue(name) \ alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
這兩個宏都會返回一個workqueue_struct結構體的指針,並且都會創建進程(“內核線程”)來執行加入到這個workqueue的work。
create_workqueue:多核CPU,這個宏,會在每個CPU上創建一個專用線程。
create_singlethread_workqueue:單核還是多核,都只在其中一個CPU上創建線程。
用法例子:
struct workqueue *wq,*ws; wq = create_workqueue("wqname"); ws = create_singlethread_workqueue("wsname");
- 定義work:
(1)靜態(其實,將這個宏,放到局部變量里面,也是個動態的):
#define DECLARE_WORK(n, f) \ struct work_struct n = __WORK_INITIALIZER(n, f)
用法例子:
void func(struct work_struct *work) { } DECLARE_WORK(wo,func);
(2)動態定義:
#define INIT_WORK(_work, _func) \ do { \ __INIT_WORK((_work), (_func), 0); \ } while (0)
用法例子:
void func(struct work_struct *work) { } struct work_struct wo; INIT_WORK(&wo,func);
還用如下宏,用來修改work綁定的函數:
#define PREPARE_WORK(_work, _func) \ do { \ (_work)->func = (_func); \ } while (0)
如:
void func(struct work_struct *work){} void funca(struct work_struct *work){} struct work_struct wo; INIT_WORK(&wo,func); PREPARE_WORK(&wo,funca);
修改綁定的函數后,當下次調度到,funca函數被調度,不再是func。
(3)將work加入到workqueue
有兩個函數:
bool queue_work(struct workqueue_struct *wq,struct work_struct *work); bool queue_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork, unsigned long delay);
兩個函數的返回值:
返回0,表示work在這之前,已經在workqueue中了
返回非0,表示work成功加入到workqueue中了
queue_delayed_work表示不是馬上把work加入到workqueue中,而是延后delay(時間單位jiffies),再加入。注意它的work(dwork)要用宏(靜態)DECLARE_DELAYED_WORK來定義和初始化,動態的可以用INIT_DELAYED_WORK,用法和沒有延后的差不多。
需要注意:當這個work被調度一次后,就從workqueue中取消了,如果還需要work被調度到(即work中的函數再被調用),需要重新加入到workqueue中,一般可以直接在work綁定的函數,最后一行調用這個兩個函數再次加入。
(4)取消work
有兩個版本
queue_work對應的版本:
bool cancel_work_sync(struct work_struct *work);
注意:調用這個函數,必須確保work所在的workqueue沒被銷毀,調用這函數的進程會等待這個work執行完成(得不到執行,進程會阻塞等待),再取消這個work。這個函數返回后,work肯定是被執行了。
queue_delayed_work對應的版本:
bool cancel_delayed_work(struct delayed_work *dwork); bool cancel_delayed_work_sync(struct delayed_work *dwork);
cancel_delayed_work:返回后,work並不一定被取消,有可能還在運行。
cancel_delayed_work_sync:返回后,work肯定已經被取消了。等到work被執行后,取消完成才返回。
銷毀workqueue
銷毀函數:
void destroy_workqueue(struct workqueue_struct *wq);
在銷毀前,最好調用flush_workqueue來確保在這workqueue上的work都處理完了:
void flush_workqueue(struct workqueue_struct *wq);
總結:工作隊列步驟,首先是創建workqueue和定義初始化work,然后將work加入到workqueue中。最后,不要時,銷毀workqueue。
共享工作隊列
共享隊列,就是系統創建了默認的workqueue,只需要定義初始化work,調用接口就完成。
兩個接口:
bool schedule_work(struct work_struct *work); bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay);
例子:
void func(struct work_struct *work) { } struct work_struct wo; INIT_WORK(&wo,func); schedule_work(&wo);
取消還是用:
bool cancel_work_sync(struct work_struct *work); bool cancel_delayed_work(struct delayed_work *dwork); bool cancel_delayed_work_sync(struct delayed_work *dwork);
對應版本接口,用對應版本接口取消。
取消后,一般需要調用下面接口,確保work完成,並取消了:
void flush_scheduled_work(void);
flush_scheduled_work能確保在系統默認創建的workqueue上所有的work都完成了。
Tasklet
- 頭文件:
#include <linux/interrupt.h>
- 定義和初始化:
(1)靜態:**
struct tasklet_struct { struct tasklet_struct *next; unsigned long state; atomic_t count; void (*func)(unsigned long); unsigned long data; }; #define DECLARE_TASKLET(name, func, data) \ struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data } #define DECLARE_TASKLET_DISABLED(name, func, data) \ struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
name:定義的tasklet_struct結構體變量
func:回調函數void (*func)(unsigned long);
data:私有數據可以是具體一個整數,或者指針。沒有一般為0。
DECLARE_TASKLET定義是直接可以用tasklet_schedule()加入到調度的。
DECLARE_TASKLET_DISABLED定義的,用這個tasklet_schedule()也無法調度到,需要使用tasklet_enable()使能,才可以被調度運行。
(2)動態:
void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data);
用法:
struct tasklet_struct tl; void func(unsigned long){} tasklet_init(&tl,func,0);
函數接口:
void tasklet_schedule(struct tasklet_struct *t); void tasklet_hi_schedule(struct tasklet_struct *t); void tasklet_hi_schedule_first(struct tasklet_struct *t); void tasklet_disable_nosync(struct tasklet_struct *t); void tasklet_disable(struct tasklet_struct *t); void tasklet_enable(struct tasklet_struct *t); void tasklet_kill(struct tasklet_struct *t); void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data);
tasklet_schedule:將tasklet加入到調度鏈表里面,tasklet就能得到執行,每調用這個函數一次,tasklet只能執行一次,要再次執行需要重新調用這個函數。
tasklet_hi_schedule:比tasklet_schedule優先級更高,可以得到更快處理。
tasklet_hi_schedule_first:和tasklet_hi_schedule差不多,只是更安全。
tasklet_disable:禁止tasklet,即使tasklet_schedule已經把tasklet調度鏈表里,也得不到執行,必須要用tasklet_enable使能才可以。如果當前tasklet正在運行,tasklet_disable會等待執行完,然后禁止,返回。
tasklet_disable_nosync:和tasklet_disable一樣,如果當前tasklet在運行,這個函數不會等待完成就先返回,當tasklet完成退出后,再禁止。
tasklet_enable:使能tasklet,和tasklet_disable要成對使用。
tasklet_kill:設備關閉和模塊卸載的時候,調用來殺死tasklet。如果當前tasklet在運行,會等待完成后,再殺死。
tasklet_init:初始化tasklet。
tasklet步驟:定義初始化綁定函數,然后調用接口把tasklet加入到調度,在這個過程中,可以使能和禁止。