Linux內核中的軟中斷、tasklet和工作隊列詳解【轉】


轉自:https://blog.csdn.net/godleading/article/details/52971179

TOC]
本文基於Linux2.6.32內核版本。

引言

軟中斷、tasklet和工作隊列並不是Linux內核中一直存在的機制,而是由更早版本的內核中的“下半部”(bottom half)演變而來。下半部的機制實際上包括五種,但2.6版本的內核中,下半部和任務隊列的函數都消失了,只剩下了前三者。
介紹這三種下半部實現之前,有必要說一下上半部與下半部的區別。
上半部指的是中斷處理程序,下半部則指的是一些雖然與中斷有相關性但是可以延后執行的任務。舉個例子:在網絡傳輸中,網卡接收到數據包這個事件不一定需要馬上被處理,適合用下半部去實現;但是用戶敲擊鍵盤這樣的事件就必須馬上被響應,應該用中斷實現。
兩者的主要區別在於:中斷不能被相同類型的中斷打斷,而下半部依然可以被中斷打斷;中斷對於時間非常敏感,而下半部基本上都是一些可以延遲的工作。由於二者的這種區別,所以對於一個工作是放在上半部還是放在下半部去執行,可以參考下面4條:

  1. 如果一個任務對時間非常敏感,將其放在中斷處理程序中執行。
  2. 如果一個任務和硬件相關,將其放在中斷處理程序中執行。
  3. 如果一個任務要保證不被其他中斷(特別是相同的中斷)打斷,將其放在中斷處理程序中執行。
  4. 其他所有任務,考慮放在下半部去執行。
    有寫內核任務需要延后執行,因此才有的下半部,進而實現了三種實現下半部的方法。這就是本文要討論的軟中斷tasklet工作隊列

下表可以更直觀的看到它們之間的關系。
執行緒關系

軟中斷

軟中斷作為下半部機制的代表,是隨着SMP(share memory processor)的出現應運而生的,它也是tasklet實現的基礎(tasklet實際上只是在軟中斷的基礎上添加了一定的機制)。軟中斷一般是“可延遲函數”的總稱,有時候也包括了tasklet(請讀者在遇到的時候根據上下文推斷是否包含tasklet)。它的出現就是因為要滿足上面所提出的上半部和下半部的區別,使得對時間不敏感的任務延后執行,而且可以在多個CPU上並行執行,使得總的系統效率可以更高。它的特性包括:

  • 產生后並不是馬上可以執行,必須要等待內核的調度才能執行。軟中斷不能被自己打斷(即單個cpu上軟中斷不能嵌套執行),只能被硬件中斷打斷(上半部)。
  • 可以並發運行在多個CPU上(即使同一類型的也可以)。所以軟中斷必須設計為可重入的函數(允許多個CPU同時操作),因此也需要使用自旋鎖來保其數據結構。

相關數據結構

  • 軟中斷描述符
    struct softirq_action{ void (*action)(struct softirq_action *);};
    描述每一種類型的軟中斷,其中void(*action)是軟中斷觸發時的執行函數。
  • 軟中斷全局數據和類型
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp; enum { HI_SOFTIRQ=0, /*用於高優先級的tasklet*/ TIMER_SOFTIRQ, /*用於定時器的下半部*/ NET_TX_SOFTIRQ, /*用於網絡層發包*/ NET_RX_SOFTIRQ, /*用於網絡層收報*/ BLOCK_SOFTIRQ, BLOCK_IOPOLL_SOFTIRQ, TASKLET_SOFTIRQ, /*用於低優先級的tasklet*/ SCHED_SOFTIRQ, HRTIMER_SOFTIRQ, RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */ NR_SOFTIRQS };

 

相關API

  • 注冊軟中斷
void open_softirq(int nr, void (*action)(struct softirq_action *))
  • 1

即注冊對應類型的處理函數到全局數組softirq_vec中。例如網絡發包對應類型為NET_TX_SOFTIRQ的處理函數net_tx_action.

  • 觸發軟中斷
void raise_softirq(unsigned int nr)
  • 1

實際上即以軟中斷類型nr作為偏移量置位每cpu變量irq_stat[cpu_id]的成員變量__softirq_pending,這也是同一類型軟中斷可以在多個cpu上並行運行的根本原因。

  • 軟中斷執行函數
do_softirq-->__do_softirq
  • 1

執行軟中斷處理函數__do_softirq前首先要滿足兩個條件:
(1)不在中斷中(硬中斷、軟中斷和NMI) 。1
(2)有軟中斷處於pending狀態。
系統這么設計是為了避免軟件中斷在中斷嵌套中被調用,並且達到在單個CPU上軟件中斷不能被重入的目的。對於ARM架構的CPU不存在中斷嵌套中調用軟件中斷的問題,因為ARM架構的CPU在處理硬件中斷的過程中是關閉掉中斷的。只有在進入了軟中斷處理過程中之后才會開啟硬件中斷,如果在軟件中斷處理過程中有硬件中斷嵌套,也不會再次調用軟中斷,because硬件中斷是軟件中斷處理過程中再次進入的,此時preempt_count已經記錄了軟件中斷!對於其它架構的CPU,有可能在觸發調用軟件中斷前,也就是還在處理硬件中斷的時候,就已經開啟了硬件中斷,可能會發生中斷嵌套,在中斷嵌套中是不允許調用軟件中斷處理的。Why?我的理解是,在發生中斷嵌套的時候,表明這個時候是系統突發繁忙的時候,內核第一要務就是趕緊把中斷中的事情處理完成,退出中斷嵌套。避免多次嵌套,哪里有時間處理軟件中斷,所以把軟件中斷推遲到了所有中斷處理完成的時候才能觸發軟件中斷。

實現原理和實例

軟中斷的調度時機:

  1. do_irq完成I/O中斷時調用irq_exit。
  2. 系統使用I/O APIC,在處理完本地時鍾中斷時。
  3. local_bh_enable,即開啟本地軟中斷時。
  4. SMP系統中,cpu處理完被CALL_FUNCTION_VECTOR處理器間中斷所觸發的函數時。
  5. ksoftirqd/n線程被喚醒時。
    下面以從中斷處理返回函數irq_exit中調用軟中斷為例詳細說明。
    觸發和初始化的的流程如圖所示:
    硬中斷觸發流程

軟中斷處理流程

asmlinkage void __do_softirq(void) { struct softirq_action *h; __u32 pending; int max_restart = MAX_SOFTIRQ_RESTART; int cpu; pending = local_softirq_pending(); account_system_vtime(current); __local_bh_disable((unsigned long)__builtin_return_address(0)); lockdep_softirq_enter(); cpu = smp_processor_id(); restart: /* Reset the pending bitmask before enabling irqs */ set_softirq_pending(0); local_irq_enable(); h = softirq_vec; do { if (pending & 1) { int prev_count = preempt_count(); kstat_incr_softirqs_this_cpu(h - softirq_vec); trace_softirq_entry(h, softirq_vec); h->action(h); trace_softirq_exit(h, softirq_vec); if (unlikely(prev_count != preempt_count())) { printk(KERN_ERR "huh, entered softirq %td %s %p" "with preempt_count %08x," " exited with %08x?\n", h - softirq_vec, softirq_to_name[h - softirq_vec], h->action, prev_count, preempt_count()); preempt_count() = prev_count; } rcu_bh_qs(cpu); } h++; pending >>= 1; } while (pending); local_irq_disable(); pending = local_softirq_pending(); if (pending && --max_restart) goto restart; if (pending) wakeup_softirqd(); lockdep_softirq_exit(); account_system_vtime(current); _local_bh_enable(); }

 

  1. 首先調用local_softirq_pending函數取得目前有哪些位存在軟件中斷。
  2. 調用__local_bh_disable關閉軟中斷,其實就是設置正在處理軟件中斷標記,在同一個CPU上使得不能重入__do_softirq函數。
  3. 重新設置軟中斷標記為0,set_softirq_pending重新設置軟中斷標記為0,這樣在之后重新開啟中斷之后硬件中斷中又可以設置軟件中斷位。
  4. 調用local_irq_enable,開啟硬件中斷。
  5. 之后在一個循環中,遍歷pending標志的每一位,如果這一位設置就會調用軟件中斷的處理函數。在這個過程中硬件中斷是開啟的,隨時可以打斷軟件中斷。這樣保證硬件中斷不會丟失。
  6. 之后關閉硬件中斷(local_irq_disable),查看是否又有軟件中斷處於pending狀態,如果是,並且在本次調用__do_softirq函數過程中沒有累計重復進入軟件中斷處理的次數超過max_restart=10次,就可以重新調用軟件中斷處理。如果超過了10次,就調用wakeup_softirqd()喚醒內核的一個進程來處理軟件中斷。設立10次的限制,也是為了避免影響系統響應時間。
  7. 調用_local_bh_enable開啟軟中斷。

軟中斷內核線程

之前我們分析的觸發軟件中斷的位置其實是中斷上下文中,而在軟中斷的內核線程中實際已經是進程的上下文。
這里說的軟中斷上下文指的就是系統為每個CPU建立的ksoftirqd進程。
軟中斷的內核進程中主要有兩個大循環,外層的循環處理有軟件中斷就處理,沒有軟件中斷就休眠。內層的循環處理軟件中斷,每循環一次都試探一次是否過長時間占據了CPU,需要調度就釋放CPU給其它進程。具體的操作在注釋中做了解釋。

    set_current_state(TASK_INTERRUPTIBLE); //外層大循環。 while (!kthread_should_stop()) { preempt_disable();//禁止內核搶占,自己掌握cpu if (!local_softirq_pending()) { preempt_enable_no_resched(); //如果沒有軟中斷在pending中就讓出cpu schedule(); //調度之后重新掌握cpu preempt_disable(); } __set_current_state(TASK_RUNNING); while (local_softirq_pending()) { /* Preempt disable stops cpu going offline. If already offline, we'll be on wrong CPU: don't process */ if (cpu_is_offline((long)__bind_cpu)) goto wait_to_die; //有軟中斷則開始軟中斷調度 do_softirq(); //查看是否需要調度,避免一直占用cpu preempt_enable_no_resched(); cond_resched(); preempt_disable(); rcu_sched_qs((long)__bind_cpu); } preempt_enable(); set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0; wait_to_die: preempt_enable(); /* Wait for kthread_stop */ set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { schedule(); set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0;

 

tasklet

由於軟中斷必須使用可重入函數,這就導致設計上的復雜度變高,作為設備驅動程序的開發者來說,增加了負擔。而如果某種應用並不需要在多個CPU上並行執行,那么軟中斷其實是沒有必要的。因此誕生了彌補以上兩個要求的tasklet。它具有以下特性:
a)一種特定類型的tasklet只能運行在一個CPU上,不能並行,只能串行執行。
b)多個不同類型的tasklet可以並行在多個CPU上。
c)軟中斷是靜態分配的,在內核編譯好之后,就不能改變。但tasklet就靈活許多,可以在運行時改變(比如添加模塊時)。
tasklet是在兩種軟中斷類型的基礎上實現的,因此如果不需要軟中斷的並行特性,tasklet就是最好的選擇。也就是說tasklet是軟中斷的一種特殊用法,即延遲情況下的串行執行

相關數據結構

  • tasklet描述符
struct tasklet_struct { struct tasklet_struct *next;//將多個tasklet鏈接成單向循環鏈表 unsigned long state;//TASKLET_STATE_SCHED(Tasklet is scheduled for execution) TASKLET_STATE_RUN(Tasklet is running (SMP only)) atomic_t count;//0:激活tasklet 非0:禁用tasklet void (*func)(unsigned long); //用戶自定義函數 unsigned long data; //函數入參 };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • tasklet鏈表
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);//低優先級 static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);//高優先級
  • 1
  • 2

相關API

  • 定義tasklet
#define DECLARE_TASKLET(name, func, data) \ struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data } //定義名字為name的非激活tasklet #define DECLARE_TASKLET_DISABLED(name, func, data) \ struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data } //定義名字為name的激活tasklet void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data) //動態初始化tasklet
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • tasklet操作
static inline void tasklet_disable(struct tasklet_struct *t) //函數暫時禁止給定的tasklet被tasklet_schedule調度,直到這個tasklet被再次被enable;若這個tasklet當前在運行, 這個函數忙等待直到這個tasklet退出 static inline void tasklet_enable(struct tasklet_struct *t) //使能一個之前被disable的tasklet;若這個tasklet已經被調度, 它會很快運行。tasklet_enable和tasklet_disable必須匹配調用, 因為內核跟蹤每個tasklet的"禁止次數" static inline void tasklet_schedule(struct tasklet_struct *t) //調度 tasklet 執行,如果tasklet在運行中被調度, 它在完成后會再次運行; 這保證了在其他事件被處理當中發生的事件受到應有的注意. 這個做法也允許一個 tasklet 重新調度它自己 tasklet_hi_schedule(struct tasklet_struct *t) //和tasklet_schedule類似,只是在更高優先級執行。當軟中斷處理運行時, 它處理高優先級 tasklet 在其他軟中斷之前,只有具有低響應周期要求的驅動才應使用這個函數, 可避免其他軟件中斷處理引入的附加周期. tasklet_kill(struct tasklet_struct *t) //確保了 tasklet 不會被再次調度來運行,通常當一個設備正被關閉或者模塊卸載時被調用。如果 tasklet 正在運行, 這個函數等待直到它執行完畢。若 tasklet 重新調度它自己,則必須阻止在調用 tasklet_kill 前它重新調度它自己,如同使用 del_timer_sync 

 

實現原理

  • 調度原理
static inline void tasklet_schedule(struct tasklet_struct *t) { if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t); } void __tasklet_schedule(struct tasklet_struct *t) { unsigned long flags; local_irq_save(flags); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next);//加入低優先級列表 raise_softirq_irqoff(TASKLET_SOFTIRQ);//觸發軟中斷 local_irq_restore(flags); }

 

  • tasklet執行過程
    TASKLET_SOFTIRQ對應執行函數為tasklet_action,HI_SOFTIRQ為tasklet_hi_action,以tasklet_action為例說明,tasklet_hi_action大同小異。
static void tasklet_action(struct softirq_action *a) { struct tasklet_struct *list; local_irq_disable(); list = __get_cpu_var(tasklet_vec).head; __get_cpu_var(tasklet_vec).head = NULL; __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;//取得tasklet鏈表 local_irq_enable(); while (list) { struct tasklet_struct *t = list; list = list->next; if (tasklet_trylock(t)) { if (!atomic_read(&t->count)) { //執行tasklet if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) BUG(); t->func(t->data); tasklet_unlock(t); continue; } tasklet_unlock(t); } //如果t->count的值不等於0,說明這個tasklet在調度之后,被disable掉了,所以會將tasklet結構體重新放回到tasklet_vec鏈表,並重新調度TASKLET_SOFTIRQ軟中斷,在之后enable這個tasklet之后重新再執行它 local_irq_disable(); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); __raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_enable(); } }

 

tasklet執行流程

工作隊列

從上面的介紹看以看出,軟中斷運行在中斷上下文中,因此不能阻塞和睡眠,而tasklet使用軟中斷實現,當然也不能阻塞和睡眠。但如果某延遲處理函數需要睡眠或者阻塞呢?沒關系工作隊列就可以如您所願了。
把推后執行的任務叫做工作(work),描述它的數據結構為work_struct ,這些工作以隊列結構組織成工作隊列(workqueue),其數據結構為workqueue_struct ,而工作線程就是負責執行工作隊列中的工作。系統默認的工作者線程為events。
工作隊列(work queue)是另外一種將工作推后執行的形式。工作隊列可以把工作推后,交由一個內核線程去執行—這個下半部分總是會在進程上下文執行,但由於是內核線程,其不能訪問用戶空間。最重要特點的就是工作隊列允許重新調度甚至是睡眠
通常,在工作隊列和軟中斷/tasklet中作出選擇非常容易。可使用以下規則:
- 如果推后執行的任務需要睡眠,那么只能選擇工作隊列。
- 如果推后執行的任務需要延時指定的時間再觸發,那么使用工作隊列,因為其可以利用timer延時(內核定時器實現)。
- 如果推后執行的任務需要在一個tick之內處理,則使用軟中斷或tasklet,因為其可以搶占普通進程和內核線程,同時不可睡眠。
- 如果推后執行的任務對延遲的時間沒有任何要求,則使用工作隊列,此時通常為無關緊要的任務。
實際上,工作隊列的本質就是將工作交給內核線程處理,因此其可以用內核線程替換。但是內核線程的創建和銷毀對編程者的要求較高,而工作隊列實現了內核線程的封裝,不易出錯,所以我們也推薦使用工作隊列。

相關數據結構

  • 正常工作結構體
struct work_struct { atomic_long_t data; //傳遞給工作函數的參數 #define WORK_STRUCT_PENDING 0 /* T if work item pending execution */ #define WORK_STRUCT_FLAG_MASK (3UL) #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK) struct list_head entry; //鏈表結構,鏈接同一工作隊列上的工作。 work_func_t func; //工作函數,用戶自定義實現 #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif }; //工作隊列執行函數的原型: void (*work_func_t)(struct work_struct *work); //該函數會由一個工作者線程執行,因此其在進程上下文中,可以睡眠也可以中斷。但只能在內核中運行,無法訪問用戶空間。
  • 延遲工作結構體(延遲的實現是在調度時延遲插入相應的工作隊列)
struct delayed_work { struct work_struct work; struct timer_list timer; //定時器,用於實現延遲處理 };
  • 工作隊列結構體
struct workqueue_struct { struct cpu_workqueue_struct *cpu_wq; //指針數組,其每個元素為per-cpu的工作隊列 struct list_head list; const char *name; int singlethread; //標記是否只創建一個工作者線程 int freezeable; /* Freeze threads during suspend */ int rt; #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif };
  • 每cpu工作隊列(每cpu都對應一個工作者線程worker_thread)
struct cpu_workqueue_struct { spinlock_t lock; struct list_head worklist; wait_queue_head_t more_work; struct work_struct *current_work; struct workqueue_struct *wq; struct task_struct *thread; } ____cacheline_aligned;

相關API

  • 缺省工作隊列
靜態創建 
DECLARE_WORK(name,function); //定義正常執行的工作項 DECLARE_DELAYED_WORK(name,function);//定義延后執行的工作項 動態創建 INIT_WORK(_work, _func) //創建正常執行的工作項 INIT_DELAYED_WORK(_work, _func)//創建延后執行的工作項 調度默認工作隊列 int schedule_work(struct work_struct *work) //對正常執行的工作進行調度,即把給定工作的處理函數提交給缺省的工作隊列和工作者線程。工作者線程本質上是一個普通的內核線程,在默認情況下,每個CPU均有一個類型為“events”的工作者線程,當調用schedule_work時,這個工作者線程會被喚醒去執行工作鏈表上的所有工作。 系統默認的工作隊列名稱是:keventd_wq,默認的工作者線程叫:events/n,這里的n是處理器的編號,每個處理器對應一個線程。比如,單處理器的系統只有events/0這樣一個線程。而雙處理器的系統就會多一個events/1線程。 默認的工作隊列和工作者線程由內核初始化時創建: start_kernel()-->rest_init-->do_basic_setup-->init_workqueues 調度延遲工作 int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay) 刷新缺省工作隊列 void flush_scheduled_work(void) //此函數會一直等待,直到隊列中的所有工作都被執行。 取消延遲工作 static inline int cancel_delayed_work(struct delayed_work *work) //flush_scheduled_work並不取消任何延遲執行的工作,因此,如果要取消延遲工作,應該調用cancel_delayed_work。

以上均是采用缺省工作者線程來實現工作隊列,其優點是簡單易用,缺點是如果缺省工作隊列負載太重,執行效率會很低,這就需要我們創建自己的工作者線程和工作隊列。

  • 自定義工作隊列
create_workqueue(name) 
//宏定義 返回值為工作隊列,name為工作線程名稱。創建新的工作隊列和相應的工作者線程,name用於該內核線程的命名。 int queue_work(struct workqueue_struct *wq, struct work_struct *work) //類似於schedule_work,區別在於queue_work把給定工作提交給創建的工作隊列wq而不是缺省隊列。 int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay) //調度延遲工作。 void flush_workqueue(struct workqueue_struct *wq) //刷新指定工作隊列。 void destroy_workqueue(struct workqueue_struct *wq) //釋放創建的工作隊列。

實現原理

  1. 工作隊列的組織結構
    即workqueue_struct、cpu_workqueue_struct與work_struct的關系。
    一個工作隊列對應一個work_queue_struct,工作隊列中每cpu的工作隊列由cpu_workqueue_struct表示,而work_struct為其上的具體工作。
    關系如下圖所示:
    work_queue結構體關系
    2.工作隊列的工作過程
    工作隊列工作過程
  2. 應用實例
    linux各個接口的狀態(up/down)的消息需要通知netdev_chain上感興趣的模塊同時上報用戶空間消息。這里使用的就是工作隊列。
    具體流程圖如下所示:
    workqueue實例

    1. 是否處於中斷中在Linux中是通過preempt_count來判斷的,具體如下: 在linux系統的進程數據結構里,有這么一個數據結構:
      #define preempt_count() (current_thread_info()->preempt_count)
      利用preempt_count可以表示是否處於中斷處理或者軟件中斷處理過程中,如下所示:
      # define hardirq_count() (preempt_count() & HARDIRQ_MASK)
      #define softirq_count() (preempt_count() & SOFTIRQ_MASK)
      #define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK | NMI_MASK))
      #define in_irq() (hardirq_count())
      #define in_softirq() (softirq_count())
      #define in_interrupt() (irq_count())
      preempt_cout各bit位
      preempt_count的8~23位記錄中斷處理和軟件中斷處理過程的計數。如果有計數,表示系統在硬件中斷或者軟件中斷處理過程中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM