kworker內核工作隊列詳解


工作隊列是另一種將工作推后執行的形式,它可以把工作交給一個內核線程去執行,這個下半部是在進程上下文中執行的,因此,它可以重新調度還有睡眠。
    區分使用軟中斷/tasklet還是工作隊列比較簡單,如果推后的工作不需要睡眠,那么就選擇軟中斷或tasklet,但如果需要一個可以重新調度,可以睡眠,可以獲取內存,可以獲取信號量,可以執行阻塞式I/O操作時,那么,請選擇工作隊列吧!
    在老的內核當中(2.6.36之前)工作隊列是內核創建一個專門的工作者線程,它有一條任務鏈表,當設備或者內核某進程有部分任務需要推后處理的時候就把任務掛載在工作者線程的任務鏈表上,然后會在未來的某個時刻,工作者線程被調度,它就會去處理掛載在任務鏈表上的任務了。
    內核有一個缺省的工作者線程叫events/nn是處理器的編號:每個處理器對應一個線程。如單處理器只有一個events/0,而雙處理器會多出一個events/1。當然,我們也可以創建我們自己的工作者線程。假如有某個任務會被頻繁的觸發,那么便可以為它創建一個專門的工作者線程,比如觸摸屏CTP
    然而在2.6.36之后的內核當中對工作隊列子系統作了改變,采用的機制改變為並發管理工作隊列機制(Concurrency Managed Workqueue (cmwq))。在原來的機制當中,當kernel需要創建一個workqueuecreate_workqueue()方式)的時候,它會在每一個cpu上創建一個work_thread,為每一個cpu分配一個struct cpu_workqueue_struct,隨着Kernel創建越來越多的workqueue,這將占用大量的的內存資源,並且加重了進程調度的任務量。而在新的工作隊列機制中它不再在每次create_workqueue時都為workqueue創建一個work thread,而是在系統啟動的時候給每個cpu創建一個work thread,當有任務項work_struct需要處理時,系統會將任務項work_struct交給某個處理器的work thread上去處理。

 

    首先來了解一下一個重要的數據結構:並發管理工作隊列的后端 gcwq
    struct global_cwq {

 

spinlock_t lock; /* the gcwq lock */

struct list_head worklist; /* L: list of pending works 需要處理的work_struct都掛載在這個鏈表上

unsigned int cpu; /* I: the associated cpu 與gcwq相關聯的cpu

unsigned int flags; /* L: GCWQ_* flags */

 

int nr_workers; /* L: total number of workers 總的工作者數量

int nr_idle; /* L: currently idle ones 當前空閑的工作者數量

 

/* workers are chained either in the idle_list or busy_hash */

struct list_head idle_list; /* X: list of idle workers 空閑的工作者鏈表

struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];正在執行的工作者線程的哈希表

/* L: hash of busy workers */

 

struct timer_list idle_timer; /* L: worker idle timeout */

struct timer_list mayday_timer; /* L: SOS timer for dworkers */

 

struct ida worker_ida; /* L: for worker IDs */

 

struct task_struct *trustee; /* L: for gcwq shutdown */

unsigned int trustee_state; /* L: trustee state */

wait_queue_head_t trustee_wait; /* trustee wait */

struct worker *first_idle; /* L: first idle worker */

} ____cacheline_aligned_in_smp;
    這個結構每個CPU都有一個,通常我們可以根據CPU在系統中的編號來獲取到這個結構,並且 gcwq會將與它關聯的CPU編號保存在其成員cpu當中。Gcwqs幾乎會處理所有的工作隊列的任務。
   在系統啟動過程中,會對gcwqs做一個初始化工作,在workqueue.cinit_workqueues(void)中:

 

{

unsigned int cpu;

int i;

 

cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);

cpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);/*主要用於支持熱插拔CPU系統,注冊CPU熱插拔事件,這樣可以在CPU上線離線的時候創建或者銷毀工作線程

 

/* initialize gcwqs */

for_each_gcwq_cpu(cpu) {//遍歷每個CPU,初始化每個CPU的gcwq

struct global_cwq *gcwq = get_gcwq(cpu);根據CPU編號獲取它的gcwq結構

 

spin_lock_init(&gcwq->lock);

INIT_LIST_HEAD(&gcwq->worklist);//初始化掛載未處理工作項的鏈表

gcwq->cpu = cpu;//保存CPU編號

gcwq->flags |= GCWQ_DISASSOCIATED;//設置當前gcwq暫時不可用

 

INIT_LIST_HEAD(&gcwq->idle_list);初始化空閑工作者線程鏈表

for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)

INIT_HLIST_HEAD(&gcwq->busy_hash[i]);

/*初始化定時器*/

init_timer_deferrable(&gcwq->idle_timer);

gcwq->idle_timer.function = idle_worker_timeout;

gcwq->idle_timer.data = (unsigned long)gcwq;

 

setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,

    (unsigned long)gcwq);

 

ida_init(&gcwq->worker_ida);

 

gcwq->trustee_state = TRUSTEE_DONE;

init_waitqueue_head(&gcwq->trustee_wait);

}

 

/* create the initial worker 為當前活動的CPU創建工作者線程*/

for_each_online_gcwq_cpu(cpu) {

struct global_cwq *gcwq = get_gcwq(cpu);

struct worker *worker;

/*遍歷每個CPU,根據CPU編號獲取對應的gcwq,然后為每個CPU創建一個工作者線程,並利用gcwq及參數true將工作者線程與CPU綁定起來*/

if (cpu != WORK_CPU_UNBOUND)

gcwq->flags &= ~GCWQ_DISASSOCIATED;

worker = create_worker(gcwq, true);

BUG_ON(!worker);

spin_lock_irq(&gcwq->lock);

start_worker(worker);/*啟動工作者線程,開始處理工作項,因為工作項是掛載在gcwq鏈表上的,因此會對gcwq的數據進行改動,所以這里需要鎖來對gcwq作保護*/

spin_unlock_irq(&gcwq->lock);

}

 

/*通過傳遞不同的參數來創建用途不同的工作隊列:

events:普通工作隊列,大部分的延遲處理函數都在這個工作隊列上運行,要求任務執行時間短,避免互相影響。

events_long:需要長時間運行的工作項可在這個工作隊列上運行。

events_nrt:該工作隊列上的任務是不可重入的,也就是該工作隊列上的任務在某處理上執行的時候,它不會在其他處理上再執行這些任務。

events_unbound:該工作隊列上的任務不會綁定在特定CPU上,只要某處理器空閑,就可以處理該工作隊列上的任務。

events_freezable:類似於events,但工作項都是被掛起的

events_nrt_freezable:類似events_nrt。

system_wq = alloc_workqueue("events", 0, 0);

system_long_wq = alloc_workqueue("events_long", 0, 0);

system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);

system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,

    WQ_UNBOUND_MAX_ACTIVE);

system_freezable_wq = alloc_workqueue("events_freezable",

      WQ_FREEZABLE, 0);

system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable",

WQ_NON_REENTRANT | WQ_FREEZABLE, 0);

BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||

       !system_unbound_wq || !system_freezable_wq ||

!system_nrt_freezable_wq);

return 0;

}

    在上面我們看到了,系統在啟動的時候會為每個gcwq創建一個工作者線程管理者worker,這個worker就是用來管理gcwq上掛載的工作項的:

                struct worker {

                /* on idle list while idle, on busy hash table while busy */

                union {

                struct list_head entry; /* L: while idle */

                struct hlist_node hentry; /* L: while busy */

                };

//與工作者線程的狀態有關,當工作者線程空閑時,使用entry,但忙碌時,使用哈希節點hentry,它們分別對應gcwq的idle_list和busy_hash[]

                struct work_struct *current_work; /* L: work being processed 當前正在處理的工作項

                struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq worker所對應的cpu_workqueue_struct

                struct list_head scheduled; /* L: scheduled works 被調度的工作項,只有進入該鏈表,工作項才能被工作隊列處理。

                struct task_struct *task; /* I: worker task 被內核調度的線程實體,通過它參與到內核調度當中去,

                struct global_cwq *gcwq; /* I: the associated gcwq 相關聯的gcwq

                /* 64 bytes boundary on 64bit, 32 on 32bit */

                unsigned long last_active; /* L: last active timestamp 最后的活動時間,用於決定該worker的生命周期

                unsigned int flags; /* X: flags */

                int id; /* I: worker id */

                struct work_struct rebind_work; /* L: rebind worker to cpu */

                };

在初始化gcwq過程當中會創建一個worker,而且worker中又有一個成員task_struct,因此我們可以想像的到在創建worker時會發生什么:

static struct worker *create_worker(struct global_cwq *gcwq, bool bind)

{

.........

worker = alloc_worker();//為一個worker分配內存並完成一些初始化工作

if (!worker)

goto fail;

 

worker->gcwq = gcwq;//綁定gcwq

worker->id = id;

//以下會根據workergcwqcpu標示是否on_unbound來創建worker的線程,使worker能參與到進程調度當中去。

if (!on_unbound_cpu)

worker->task = kthread_create_on_node(worker_thread,

      worker,

      cpu_to_node(gcwq->cpu),

      "kworker/%u:%d", gcwq->cpu, id);

else

worker->task = kthread_create(worker_thread, worker,

      "kworker/u:%d", id);

if (IS_ERR(worker->task))

goto fail;

.........

}

worker創建的工作者線程會在創建好后啟動,開始處理工作項。工作者線程的主要工作在worker_thread中完成:

static int worker_thread(void *__worker)

{

struct worker *worker = __worker;

struct global_cwq *gcwq = worker->gcwq;

 

/* tell the scheduler that this is a workqueue worker */

worker->task->flags |= PF_WQ_WORKER;//告訴調度器這是一個工作者線程

woke_up:

spin_lock_irq(&gcwq->lock);//鎖住gcwq,每個cpu只有一個gcwq結構,該CPU的其他線程同樣有可能對它進行改變,因此這里需要加鎖保護

 

/* DIE can be set only while we're idle, checking here is enough */

if (worker->flags & WORKER_DIE) {

spin_unlock_irq(&gcwq->lock);

worker->task->flags &= ~PF_WQ_WORKER;

return 0;

}

//首先讓工作者線程的管理者從空閑狀態退出,因為它現在要work了,這里的主要工作是清除worker的空閑狀態標志,減少gcwqnr_idle數量,並將workerentry刪除並重新初始化。

worker_leave_idle(worker);

recheck:

/* no more worker necessary? */

if (!need_more_worker(gcwq))//這里會對gcwq進行檢查:gcwq上的worklist上是否掛載有未處理的工作項,如果沒有,說明當前工作者線程無事可做,睡眠。檢查當前CPUgcwqworker如果有更高優先級的工作要處理,且系統的全局隊列中已經沒有空閑的worker了,那么這時應該需要一個新的worker

goto sleep;

if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))

goto recheck;

BUG_ON(!list_empty(&worker->scheduled));

worker_clr_flags(worker, WORKER_PREP);

//這里做一些處理工作項work_struct的准備工作如查看gcwq的空閑工作者數量,提示worker上不能有准備調度的工作項,清除和設置worker的一些標志。

do {

struct work_struct *work =

list_first_entry(&gcwq->worklist,

 struct work_struct, entry);//gcwqworklist鏈表上獲取未處理的工作項work_struct

if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {

/* optimization path, not strictly necessary */

process_one_work(worker, work);

if (unlikely(!list_empty(&worker->scheduled)))

process_scheduled_works(worker);

} else {

move_linked_works(work, &worker->scheduled, NULL);

process_scheduled_works(worker);

}

} while (keep_working(gcwq));

在這個do while()循環里處理掛載在gcwq上的未處理的工作項。一般情況下,會在do while()循環里把gcwqworklist的所有未處理完的工作項都處理完后終止。在循環中,它通過process_one_work(worker, work)逐個的將工作項處理掉,process_scheduled_works(worker)內部同樣是調用process_one_work(worker, work):

static void process_one_work(struct worker *worker, struct work_struct *work)

__releases(&gcwq->lock)

__acquires(&gcwq->lock)

{

........

work_func_t f = work->func;

........

trace_workqueue_execute_start(work);

f(work);

trace_workqueue_execute_end(work);

.........

}

因此,process_one_work()的最終目的是執行work_structfunc函數,也就是每個工作項需要完成的工作。

 

工作隊列的使用:

工作項:

當然首先我們先來了解一下工作項:

struct work_struct {

atomic_long_t data;//work_struct 的參數

struct list_head entry;//work_struct 的連接鏈表

work_func_t func;//工作項處理函數

#ifdef CONFIG_LOCKDEP

struct lockdep_map lockdep_map;

#endif

};
有兩種方法來創建工作項:
靜態:DECLARE_WORK(struct work_struct *work,void(*func) (void *));
動態:INIT_WORK(struct work_struct *work,void(*func)(void *));
無論是靜態還是動態的方法,它們的主要任務就是初始化好一個work_struct結構:
初始化work_structdataentry值,並且將func指向一個可執行函數:

 

do { \

__init_work((_work), _onstack); \

(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \

INIT_LIST_HEAD(&(_work)->entry); \

PREPARE_WORK((_work), (_func)); \

} while (0);
當一個工作項被初始化好之后,表明它可以進入工作狀態了,這時候我們就可以調度它了,如果我們把它交給系統默認線程去執行:

 

Schedule_work(work);

{

return queue_work(system_wq, work);

}
從代碼可知,當調度執行工作項的時候,其實是去queue_work(system_wq,work),而從前面的初始化gcwqs過程中可知,system_wq是在系統啟動過程中創建的一個普通的工作隊列,也就是說,我們這里初始化的工作項會交給
統啟動過程中的普通工作隊列的工作者線程去處理。
當然,但我們有足夠的理由需要去創建一個新的工作隊列的時候我們可以:

 

Create_workqueue(const char *name);

創建一個名字為name的工作隊列;

#define create_workqueue(name) \

alloc_workqueue((name), WQ_MEM_RECLAIM, 1) //在初始化gcwqs時使用同樣的方式創建了幾個默認的系統工作隊列。

創建工作隊列的主要工作在struct workqueue_struct *__alloc_workqueue_key(const char *fmt,

       unsigned int flags,

       int max_active,

       struct lock_class_key *key,

       const char *lock_name, ...)

{

.........

wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL);

if (!wq)

goto err;

//為一個workqueue_struct分配內存。

vsnprintf(wq->name, namelen, fmt, args1);//初始化wqname

/* init wq */

wq->flags = flags;

wq->saved_max_active = max_active;

mutex_init(&wq->flush_mutex);

atomic_set(&wq->nr_cwqs_to_flush, 0);

INIT_LIST_HEAD(&wq->flusher_queue);

INIT_LIST_HEAD(&wq->flusher_overflow);

 

lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

INIT_LIST_HEAD(&wq->list);//初始化wq的主要成員

......

for_each_cwq_cpu(cpu, wq) {

struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

struct global_cwq *gcwq = get_gcwq(cpu);

 

BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);

cwq->gcwq = gcwq;

cwq->wq = wq;

cwq->flush_color = -1;

cwq->max_active = max_active;

INIT_LIST_HEAD(&cwq->delayed_works);

}

//per-cpu workqueuewqgcwq關聯起來。Cpu_workqueque_struct中保存了與CPU關聯的gcwq,在這里,又關聯了wq

......

list_add(&wq->list, &workqueues);//最后把wq放到一個workqueues的鏈表上。

.........

Return wq;

}

它會返回一個初始化好的工作隊列wq
在整個創建workqueue的流程當中我們可以看到,它並沒有為新的workqueue去創建一個工作者線程,而是將wqcpu_workqueue_struct關聯起來,在這個cpu_workqueue_struct結構中還關聯了gcwq,然后把workqueue放到workqueues鏈表上去。
創建好了我們自己的工作隊列之后,我們可以用自己的工作隊列去執行工作項:
Queue_work(struct workqueue_struct *wq,struct wori_struct *work);
與上面的使用系統默認的工作隊列調度工作項比較,就是使用了自己創建的而不是系統開機啟動時創建的工作隊列。
現在我們來看看系統到底是怎么去執行工作隊列的吧:

 

Queue_work()->queue_work_on()->__queue_work()

{

struct global_cwq *gcwq;

struct cpu_workqueue_struct *cwq;

struct list_head *worklist;

unsigned int work_flags;

unsigned long flags;

 

debug_work_activate(work);

 

/* if dying, only works from the same workqueue are allowed */

if (unlikely(wq->flags & WQ_DRAINING) &&

    WARN_ON_ONCE(!is_chained_work(wq)))

return;

 

/* determine gcwq to use */

if (!(wq->flags & WQ_UNBOUND)) {

struct global_cwq *last_gcwq;

 

if (unlikely(cpu == WORK_CPU_UNBOUND))

cpu = raw_smp_processor_id();

 

/*

 * It's multi cpu.  If @wq is non-reentrant and @work

 * was previously on a different cpu, it might still

 * be running there, in which case the work needs to

 * be queued on that cpu to guarantee non-reentrance.

 */

gcwq = get_gcwq(cpu);

if (wq->flags & WQ_NON_REENTRANT &&

    (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {

struct worker *worker;

 

spin_lock_irqsave(&last_gcwq->lock, flags);

 

worker = find_worker_executing_work(last_gcwq, work);

 

if (worker && worker->current_cwq->wq == wq)

gcwq = last_gcwq;

else {

/* meh... not running there, queue here */

spin_unlock_irqrestore(&last_gcwq->lock, flags);

spin_lock_irqsave(&gcwq->lock, flags);

}

} else

spin_lock_irqsave(&gcwq->lock, flags);

} else {

gcwq = get_gcwq(WORK_CPU_UNBOUND);//

spin_lock_irqsave(&gcwq->lock, flags);

}

//前面這部分代碼的主要功能是根據wq的標志位來獲取合適的gcwq

/* gcwq determined, get cwq and queue */

cwq = get_cwq(gcwq->cpu, wq);

trace_workqueue_queue_work(cpu, cwq, work);

 

if (WARN_ON(!list_empty(&work->entry))) {

spin_unlock_irqrestore(&gcwq->lock, flags);

return;

}

 

cwq->nr_in_flight[cwq->work_color]++;

work_flags = work_color_to_flags(cwq->work_color);

 

if (likely(cwq->nr_active < cwq->max_active)) {

trace_workqueue_activate_work(work);

cwq->nr_active++;

worklist = gcwq_determine_ins_pos(gcwq, cwq);//gcwqworklist上(也就是未執行工作項鏈表)找到一個合適的位置

} else {

work_flags |= WORK_STRUCT_DELAYED;

worklist = &cwq->delayed_works;

}

 

insert_work(cwq, work, worklist, work_flags);//將工作項插入到對應gcwqworklist的合適位置上。

 

spin_unlock_irqrestore(&gcwq->lock, flags);

}

因此,用一句簡單的句可以概括queue_work(),就是把工作項work放到與wq關聯的gcwq的未執行工作項鏈表上去。前面有分析,gcwq對應的工作者線程在被調度的時候會把gcwq上的未執行的工作項都執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM