聊一聊Linux中的工作隊列


2018-01-18


 

工作隊列是Linux內核中把工作延遲執行的一種手段,其目的不同於軟中斷,軟中斷是提高CPU的響應,盡可能的縮短關中斷的時間;而工作隊列主要目的是節省資源,其比較適合很微小的任務,比如執行某個喚醒工作等。通過創建線程同樣可以達到目的,但是線程畢竟有其自身的資源開銷如CPU、內存等。如果某個任務很小的話,就不至於創建一個線程,因此Linux內核提供了工作隊列這種方式。本文參考內核代碼3.10.1版本,而此時的工作隊列稱為Concurrency Managed Workqueue (cmwq),對於傳統的工作隊列,本文就不做介紹。

一、總體描述

在詳細介紹工作隊列前,我們先看下相關的核心數據結構

struct work_struct {
    atomic_long_t data;
    struct list_head entry;
    work_func_t func;//工作處理函數
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

這是工作隊列機制暴露給外部(使用方)的工作對象,entry維護該結構在worker_pool中的鏈表,func是一個函數指針,指向該工作需要執行的處理函數,而data成員從代碼還未看出具體作用。一個驅動程序后者內核模塊要使用工作隊列,創建一個work_struct結構,填充其中的func字段即可,之后調用schedule_work提交給對象即可。關於schedule_work后面我們在描述,下面開始展開內核對於工作隊列的管理。

內核中既然把工作隊列作為一種資源使用,其自然有其自身的管理規則,因此在內核中涉及到一下對象:

  • worker    工作者,顧名思義為處理工作的單位
  • worker_pool   工作者池,每個worker必然屬於某個worker_pool,一個worker_pool可以有多個worker
  • workqueue_struct     官方解釋是對外部可見的workqueue
  • pool_workqueue   鏈接workqueue_struct  和worker_pool的中介,每個workqueue_struct 可以有多個worker_pool,而一個worker_pool只能屬於一個workqueue_struct    

 幾個對象之間的關系如下圖所示:

如前所述,外部使用的意思就是如果要使用工作隊列,就是創建好work_struct結構,然后調用schedule_work即可,剩下的處理任務就是系統部分完成了。每個和外部交互的workqueue_struct,對應有多個pwq(pool_workqueue   ),pool_workqueue 鏈接workqueue_struct和worker_pool的橋梁,worker_pool是核心所在,其包含有所有的worker,以及該pool對應的item即work_struct。其中worker其實就是一個線程,根據busy后者空閑位於hash表或者鏈表中。而所有的item就通過雙鏈表的方式鏈接到worker_pool維護的鏈表頭上。

 二、具體介紹

2.1 workqueue(workqueue_struct)

 該結構是 externally visible workqueue,即外部可見的工作隊列,而其本身主要描述隊列的屬性,既不包含worker也不包含work。一個workqueue對應多個pwd,這些pwq鏈接在workqueue_struct結構中的pwqs鏈表頭上。而系統中所有的workqueue通過list字段鏈接成雙鏈表。系統內部已經定義了幾個workqueue,如下所示

struct workqueue_struct *system_wq __read_mostly;
EXPORT_SYMBOL(system_wq);
struct workqueue_struct *system_highpri_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_highpri_wq);
struct workqueue_struct *system_long_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_long_wq);
struct workqueue_struct *system_unbound_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_unbound_wq);
struct workqueue_struct *system_freezable_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_freezable_wq);

 

 而一般情況下,系統中通過schedule_work均是把work加入到system_wq中。從代碼來看,系統中的workqueue根據使用情況可以分為兩種:普通的workqueue和unbound workqueue。前者的worker一般是和CPU綁定的,系統會為每個CPU創建一個pwd,而針對后者,就不和單個CPU綁定,而是針對NUMA節點,創建pwd。

2.2  worker    

worker是具體處理work的對象,系統把worker作為一種資源管理,提出了worker_pool的概念,一個worker必定會屬於某個worker_pool,worker結構如下

struct worker {
    /* on idle list while idle, on busy hash table while busy */
    union {
        struct list_head    entry;    /* L: while idle */
        struct hlist_node    hentry;    /* L: while busy */
    };
    struct work_struct    *current_work;    /* L: work being processed */
    work_func_t        current_func;    /* L: current_work's fn */
    struct pool_workqueue    *current_pwq; /* L: current_work's pwq */
    bool            desc_valid;    /* ->desc is valid */
    struct list_head    scheduled;    /* L: scheduled works */
    /* 64 bytes boundary on 64bit, 32 on 32bit */
    struct task_struct    *task;        /* I: worker task */
    struct worker_pool    *pool;        /* I: the associated pool */
                        /* L: for rescuers */
    unsigned long        last_active;    /* L: last active timestamp */
    unsigned int        flags;        /* X: flags */
    int            id;        /* I: worker id */
    /*
     * Opaque string set with work_set_desc().  Printed out with task
     * dump for debugging - WARN, BUG, panic or sysrq.
     */
    char            desc[WORKER_DESC_LEN];
    /* used only by rescuers to point to the target workqueue */
    struct workqueue_struct    *rescue_wq;    /* I: the workqueue to rescue */
};

 

一個worker根據自身狀態不同會處於不同的數據結構中,當worker沒有任務要處理就是idle狀態,處於worker_pool維護的鏈表中;當worker在處理任務,就處於worker_pool維護的hash表中。task字段指向該worker對象線程的task_struct結構。pool指向其隸屬的worker_pool。而如果該worker是一個rescuer worker,最后一個字段指向其對應的workqueue。當worker在處理任務時,current_work指向正在處理的work,current_func是work的處理函數,current_pwd指向對應的pwq。worker的線程處理函數為worker_thread。

static int worker_thread(void *__worker)
{
    struct worker *worker = __worker;
    struct worker_pool *pool = worker->pool;

    /* tell the scheduler that this is a workqueue worker */
    worker->task->flags |= PF_WQ_WORKER;
woke_up:
    spin_lock_irq(&pool->lock);

    /* am I supposed to die? */
    if (unlikely(worker->flags & WORKER_DIE)) {
        spin_unlock_irq(&pool->lock);
        WARN_ON_ONCE(!list_empty(&worker->entry));
        worker->task->flags &= ~PF_WQ_WORKER;
        return 0;
    }
    /*worker只有在執行任務時才是idle狀態*/
    worker_leave_idle(worker);
recheck:
    /* no more worker necessary? */
    if (!need_more_worker(pool))
        goto sleep;

    /* do we need to manage? */
    if (unlikely(!may_start_working(pool)) && manage_workers(worker))
        goto recheck;

    /*
     * ->scheduled list can only be filled while a worker is
     * preparing to process a work or actually processing it.
     * Make sure nobody diddled with it while I was sleeping.
     */
    WARN_ON_ONCE(!list_empty(&worker->scheduled));

    /*
     * Finish PREP stage.  We're guaranteed to have at least one idle
     * worker or that someone else has already assumed the manager
     * role.  This is where @worker starts participating in concurrency
     * management if applicable and concurrency management is restored
     * after being rebound.  See rebind_workers() for details.
     */
    worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

    do {
        //從pool中摘下一個work_struct
        struct work_struct *work =
            list_first_entry(&pool->worklist,
                     struct work_struct, entry);

        if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
            /* optimization path, not strictly necessary */
            process_one_work(worker, work);
            if (unlikely(!list_empty(&worker->scheduled)))
                process_scheduled_works(worker);
        } else {
            move_linked_works(work, &worker->scheduled, NULL);
            process_scheduled_works(worker);
        }
    } while (keep_working(pool));

    worker_set_flags(worker, WORKER_PREP, false);
sleep:
    if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker))
        goto recheck;

    /*
     * pool->lock is held and there's no work to process and no need to
     * manage, sleep.  Workers are woken up only while holding
     * pool->lock or from local cpu, so setting the current state
     * before releasing pool->lock is enough to prevent losing any
     * event.
     */
     /*恢復idle狀態*/
    worker_enter_idle(worker);
    __set_current_state(TASK_INTERRUPTIBLE);
    spin_unlock_irq(&pool->lock);
    schedule();
    goto woke_up;
}

 

從該函數可以看出worker只有在處理任務時,才是idle狀態。在執行任務前通過worker_leave_idle把worker從idle鏈表摘下並清除idle標志。然后會檢查當前pool是否需要更多的worker,如果不需要則繼續睡眠。怎么判斷是否需要呢?這里有一個函數need_more_worker

static bool need_more_worker(struct worker_pool *pool)
{
    /*如果工作者鏈表不為空且現在沒有並發*/
    return !list_empty(&pool->worklist) && __need_more_worker(pool);
}
static bool __need_more_worker(struct worker_pool *pool)
{
    return !atomic_read(&pool->nr_running);
}

 

針對unbound pool,只要存在work,那么該函數就返回true,因為unbound的pool並不計算nr_running。但是從這里看,針對普通的pool,只有在worklist不為空且沒有正在運行的worker時才會返回true,那么怎么同時讓多個worker同時運行呢??不解!如果確實需要則檢查下是否需要管理worker,因為此時需要worker,所以需要判斷下有沒有idle的worker,如果沒有則調用manage_workers進行管理,該函數中兩個核心處理函數就是maybe_destroy_workers和maybe_create_worker。待檢查過后,就開始具體的處理了,核心邏輯都在一個循環體中。

具體處理過程比較明確,先從pool的worklist中摘下一個work,如果該work沒有設置WORK_STRUCT_LINKED標志,就直接調用process_one_work函數進行處理,如果worker->scheduled鏈表不為空,則調用process_scheduled_works對鏈表上的work進行處理;如果work設置了WORK_STRUCT_LINKED標志,則需要把work移動到worker的scheduled鏈表上,然后通過process_scheduled_works進行處理。而循環的條件是keep_working(pool),即只要worklist不為空且在運行的worker數目小於等於1(這里也不太明白,為何是小於等於1)。處理單個work的流程看process_one_work

該函數一個比較重要的驗證就是判斷當前work是否已經有別的worker在處理,如果存在則需要把work加入到對應worker的scheduled鏈表,以避免多個worker同時處理同一work;如果沒問題就着手開始處理。具體處理過程比較簡單,把worker加入到busy的hash表,然后設置worker的相關字段,主要是current_work、current_func和current_pwq。然后把work從鏈表中刪除,之后就執行work的處理函數進行處理。當worker處理完成后,需要把worker從hash表中刪除,並把相關字段設置默認值。

process_scheduled_works就比較簡單,就是循環對worker中scheduled鏈表中的work執行處理,具體處理方式就是調用process_one_work。

2.3 worker_pool

顧名思義,worker_pool本身的重要任務就是管理worker,除此之外,worker_pool還管理用戶提交的work。在worker_pool中有一個鏈表頭idle_list,鏈接worker中的entry,對應於空閑的worker;而hash表busy_hash鏈接worker中的hentry,對應正在執行任務的worker。nr_workers和nr_idle代表worker和idle worker的數量。系統中worker_pool是一個perCPU變量,看下worker_pool的聲明

static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
                     cpu_worker_pools);

 

每個CPU對應有兩個worker_pool,一個針對普通的workqueue,一個針對高優先級workqueue。而PWQ也是perCPU變量,即一個workqueue在每個CPU上都有對應的pwq,也就有對應的worker_pool。、

下篇文章介紹下workqueue的創建以及worker的管理。

以馬內利

參考資料:

LInux內核3.10.1源碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM