xenomai信號
上篇文章講了linux的信號在內核的發送與處理流程,現在加入了cobalt核,Cobalt內核為xenomai線程提供了信號機制。下面一一解析xenomai內核的信號處理機制。
1 雙核下的信號分類
我們已經知道,每個用戶空間的xenomai線程在內核空間都有兩個調度實體,一是在linux內核中的task_struct,另一個是稱為linux空間的一個影子(shadow)的xnthread。它們表示的是同一個線程,linux調度的是task_struct,實時核cobalt調度的是xnthread。可通過在兩個調度器間遷移的方式來讓xenomai線程在linux核和Cobalt核上運行。
linux進程與線程的信號就讓人頭疼的了,再來一個xenomai信號豈不更復雜?其實不復雜,只需要分清三種信號及其作用域就OK,如圖所示,進程A內有兩個實時線程和一個普通線程,進程B內有一個實時線程和一個普通線程,它們之間的信號分為兩類:
- 使用linux信號機制:進程間信號、linux線程間信號;
- 使用xenomai信號機制:只存在於xenomai線程間,xenomai內的任何信號都不會傳播到linux進程空間,也不會導致進程退出;
由於一個xenomai線程它既是linux任務也是cobalt任務,不同的信號產生和處理在不同的內核中。對於linux信號,由linux調度器(linux內核)發送和接收處理;xenomai信號由xenomai線程通過cobalt內核接口發送或接收,可用於同步互斥(可與信號量semaphore對比),對於一個xenomai應用中的linux信號:
- xenomai線程通過
__STD(kill/pthread_kill)
通過linux發送的信號,調用時會自動遷移到linux內核(root域)再發送;(__STD()
宏表示顯式調用Linux標准庫函數) - xenomai線程接收linux信號處理時也是一樣,必須遷移到linux內核才能處理;
對於linux向xenomai發送信號,例如我們終端中啟動一個xenomai任務后,通過鍵入ctrl+c結束xenomai任務的操作,linux在查找處理該信號的任務時,如果需要處理信號的是一個實時任務,會把xenomai任務遷移到linux核上,再按linux的處理那套流程去處理就行。下面們看它是怎么處理的,與上一篇文章linux下的信號處理流程對比,其中其中不一樣的步驟如下,其余的與linux處理方式一致。
- linux進程或者 shell 發送一個信號給xenomai線程A,可以調用 kill,tkill,tgkill,rt_sigqueueinfo
- 四個發送信號的函數,在內核中最終都是調用 do_send_sig_info
do_send_sig_info
調用send_signal
給xenomai任務A 發送一個信號,其實就是找到 A 的task_struct
,不可靠信號加入信號集合,可靠信號,加入信號鏈表。然后調用complete_signal()
處理信號。complete_signal()
調用signal_wake_up()->signal_wake_up_state()
喚醒A。
雙核下,xenomai在signal_wake_up_state函數中插入了檢測代碼如下。
void signal_wake_up_state(struct task_struct *t, unsigned int state)
{
set_tsk_thread_flag(t, TIF_SIGPENDING);
/* TIF_SIGPENDING must be prior to reporting.TIF_SIGPENDING */
__ipipe_report_sigwake(t);
if (!wake_up_state(t, state | TASK_INTERRUPTIBLE))
kick_process(t);
}
插入代碼__ipipe_report_sigwake(t)
,__ipipe_report_sigwake()
調用__ipipe_notify_kevent()
發出一個內核間信號事件IPIPE_KEVT_SIGWAKE
,__ipipe_notify_kevent
調用Cobalt內核的ipipe_kevent_hook來接收這些事件。
int ipipe_kevent_hook(int kevent, void *data)
{
int ret;
switch (kevent) {
case IPIPE_KEVT_SCHEDULE:
ret = handle_schedule_event(data);/**/
break;
case IPIPE_KEVT_SIGWAKE:
ret = handle_sigwake_event(data);///IPIPE_KEVT_SIGWAKE
break;
......
default:
ret = KEVENT_PROPAGATE;
}
return ret;
}
ipipe_kevent_hook中根據事件類型執行handle_sigwake_event。
static int handle_sigwake_event(struct task_struct *p)
{
struct xnthread *thread;
sigset_t pending;
spl_t s;
thread = xnthread_from_task(p);
......
xnlock_get_irqsave(&nklock, s);
......
if (xnthread_test_state(thread, XNRELAX)) {
xnlock_put_irqrestore(&nklock, s);
return KEVENT_PROPAGATE;
}
......
if (p->state & (TASK_INTERRUPTIBLE|TASK_UNINTERRUPTIBLE))
cobalt_set_task_state(p, p->state | TASK_NOWAKEUP);
__xnthread_kick(thread);
xnsched_run();
xnlock_put_irqrestore(&nklock, s);
return KEVENT_PROPAGATE;
}
handle_sigwake_event()
中的邏輯很簡單,先看A是運行在root域還是haed域,如果本來就在root域(處於XNRELAX狀態),即在linux核上調度,那么不用做什么操作,可直接處理信號;如果A現在是head域調度,先看看它是不是可中斷睡眠狀態(TASK_INTERRUPTIBLE|TASK_UNINTERRUPTIBLE),然后調用__xnthread_kick()
將任務A踢出haed域。最后調用xnsched_run
將CPU讓給linux調度器以盡快喚醒任務A進行信號處理。后面的處理與linux一致。
對於xenomai向linux發送信號,需要在xenomai任務代碼內顯性調用函數kill()或pthread_kill()
發送。且必須通過__STD()
修飾kill()
函數,編譯時才會直接使用glibc的kill函數,對pthread_kill也是一樣。不加修飾的kill()或pthread_kill()函數會在編譯時默認鏈接到libcobalt定義的函數。
COBALT_IMPL(int, kill, (pid_t pid, int sig))
{
int ret;
if (pid <= 0)
return __STD(kill(pid, sig));
ret = XENOMAI_SYSCALL2(sc_cobalt_kill, pid, sig);
if (ret) {
if (ret == -ESRCH)
return __STD(kill(pid, sig));
....
}
return 0;
}
COBALT_IMPL(int, pthread_kill, (pthread_t thread, int sig))
{
int ret;
ret = -XENOMAI_SYSCALL2(sc_cobalt_thread_kill, thread, sig);
if (ret == ESRCH)
return __STD(pthread_kill(thread, sig));
return ret;
}
兩個函數都是先嘗試讓xenomai內核處理,在xenomai內核最終都會調用__cobalt_kill()
;如果該pid不是xenomai線程,才會轉而調用glibc的kill函數,通過linux內核處理。
總之,使用linux信號的操作,不管是實時還是非實時都必須在linux調度器上運行才能完成操作。
2 xenomai信號
xenomai線程間的信號處理機制由xenomai內核實現,與linux線程信號類似,但沒有linux線程信號那么復雜,它。
既然是xenomai線程間的,那就要類似的像linux那樣實現xenomai內核的一套信號管理機制。首先是每個xenomai線程的內核管理結構cobalt_thread里面關於信號處理的字段。
struct cobalt_process {
......
struct list_head sigwaiters;
......
};
struct cobalt_thread {
......
struct xnthread threadbase;
struct cobalt_process *process;
......
/** Signal management. */
sigset_t sigpending;
struct list_head sigqueues[_NSIG]; /* in cobalt_sigpending */
struct xnsynch sigwait;
struct list_head signext;
......
};
sigpending
表示哪些信號尚等待處理(未決),這里只是表示某個信號待處理,該信號具體有多少個需要看sigqueues[]
;sigqueues[]
信號隊列,它的大小是_NSIG,也就是說每個信號都有個隊列。sigwait
一個資源同步對象(xnsynch)表示,我們在13.2 資源同步對象—xnsynch小節解析了xnsynch是干什么用的,信號也是一種資源,所以這里用來等待一個信號資源,當用戶調用sigwait系統調用等待一個信號的時候,就會在sigwait
上睡眠等待信號。signext
在sigwait時用來加入cobalt_process中的sigwaiters鏈表。
xenomai的信號與linux一致,1-31號信號不支持排隊,31-64號信號支持排隊。
需要注意的是xenomai內的信號除與linux一致外還有幾個特有的信號,這些信號是不公開的,僅存在於xenomai內核與libcobalt內部;這些信號無法屏蔽,不能進行排隊,也無法將它們設置為信號集。
/*\arch\x86\include\uapi\asm\signal.h*/
#define SIGRTMIN 32
#define SIGRTMAX _NSIG
/*\include\xenomai\cobalt\uapi\signal.h*/
#define SIGSUSP (SIGRTMAX + 1)
#define SIGRESM (SIGRTMAX + 2)
#define SIGRELS (SIGRTMAX + 3)
#define SIGKICK (SIGRTMAX + 4)
#define SIGDEMT (SIGRTMAX + 5)
SIGSUSP
對指定線程suspend操作;SIGRESM
對指定線程resume操作;SIGRELS
對指定線程解阻塞;SIGKICK
迫使指定線程退出主模式;SIGDEMT
將指定線程降級為非實時線程,該影子線程仍可訪問Xenomai資源,但不再競爭實時調度。
xenomai信號相關接口,由libcobalt實現。如下
int sigwaitinfo(const sigset_t *set, siginfo_t *si);
int sigwait(const sigset_t *set, int *sig);
int sigtimedwait (const sigset_t *set, siginfo_t *si,
const struct timespec *timeout);
int sigpending(sigset_t *set);
int kill(pid_t pid, int sig);
int sigqueue(pid_t pid, int sig, const union sigval value);
int pthread_kill(pthread_t thread, int sig);
2.1 xenomai發送信號
kill、pthread_kill用於發送xenomai信號,注意:不使用__STD()
宏修飾的posix函數默認鏈接到實時內核庫libcobalt。libcobalt中定義如下:
/*lib\cobalt\signal.c*/
COBALT_IMPL(int, kill, (pid_t pid, int sig))
{
int ret;
if (pid <= 0)
return __STD(kill(pid, sig));
ret = XENOMAI_SYSCALL2(sc_cobalt_kill, pid, sig);
if (ret) {
if (ret == -ESRCH)
return __STD(kill(pid, sig));
....
}
return 0;
}
/*lib\cobalt\thread.c*/
COBALT_IMPL(int, pthread_kill, (pthread_t thread, int sig))
{
int ret;
ret = -XENOMAI_SYSCALL2(sc_cobalt_thread_kill, thread, sig);
if (ret == ESRCH)
return __STD(pthread_kill(thread, sig));
return ret;
}
先通過系統調用讓xenomai內核處理,每個用戶線程在內核都是一個任務,都具有pid,不管是pid還是pthread_t,最終都會轉換為xenomai線程內核結構cobalt_thread,再對cobalt_thread進行信號相關操作。如果不能轉換說明該pid或pthread_t表示的線程不是一個xenomai線程,就會返回ESRCH。轉而調用glibc的pthread_kill、kill函數進而讓linux去處理。
/*\kernel\xenomai\posix\thread.c*/
COBALT_SYSCALL(thread_kill, conforming,
(unsigned long pth, int sig))
{
struct cobalt_local_hkey hkey;
struct cobalt_thread *thread;
int ret;
spl_t s;
......
hkey.u_pth = pth;
hkey.mm = current->mm;
thread = thread_lookup(&hkey);
if (thread == NULL)
ret = -ESRCH;
else
ret = __cobalt_kill(thread, sig, 0);
.....
return ret;
}
/*\kernel\xenomai\posix\signal.c*/
COBALT_SYSCALL(kill, conforming, (pid_t pid, int sig))
{
struct cobalt_thread *thread;
int ret;
spl_t s;
thread = cobalt_thread_find(pid);/*找到線程*/
if (thread == NULL)
ret = -ESRCH;
else
ret = __cobalt_kill(thread, sig, 1);
return ret;
}
sc_cobalt_kill
系統調用,則是通過pid來找到對應的cobalt_thread,然后調用__cobalt_kill()
。sc_cobalt_thread_kill
系統調用內,將pthread_t作為hashkey,找到該線程的cobalt_thread,最終調用__cobalt_kill()
。
不同的是調用__cobalt_kill
的第三個參數。sc_cobalt_kill
系統調用傳入的是1,表示給線程組發送信號,當thread指向的那個線程沒有等待任何信號時會嘗試發送給同一線程組其他等待該信號的線程;sc_cobalt_thread_kill
系統調用內傳入的是0.當thread指向的那個線程沒有等待任何信號時就不做任何操作直接返回。
int __cobalt_kill(struct cobalt_thread *thread, int sig, int group) /* nklocked, IRQs off */
{
struct cobalt_sigpending *sigp;
int ret = 0;
switch(sig) {
case 0:
/* Check for existence only. */
break;
case SIGSUSP:
xnthread_suspend(&thread->threadbase, XNSUSP,
XN_INFINITE, XN_RELATIVE, NULL);
if (&thread->threadbase == xnthread_current() &&
xnthread_test_info(&thread->threadbase, XNBREAK))
ret = -EINTR;
break;
case SIGRESM:
xnthread_resume(&thread->threadbase, XNSUSP);
goto resched;
case SIGRELS:
xnthread_unblock(&thread->threadbase);
goto resched;
case SIGKICK:
xnthread_kick(&thread->threadbase);
goto resched;
case SIGDEMT:
xnthread_demote(&thread->threadbase);
goto resched;
case 1 ... _NSIG:
sigp = cobalt_signal_alloc(); /*分配一個信號結構體*/
if (sigp) {
sigp->si.si_signo = sig;
sigp->si.si_errno = 0;
sigp->si.si_code = SI_USER;
sigp->si.si_pid = task_pid_nr(current);
sigp->si.si_uid = get_current_uuid();
if (cobalt_signal_send(thread, sigp, group) <= 0)
cobalt_signal_free(sigp);
}
resched:
xnsched_run();
break;
default:
ret = -EINVAL;
}
return ret;
}
xenomai內核中POSIX信號支持排隊,先分配一個cobalt_sigpending。直接分配結構體大小的內存是不可取的,這會影響實時性,xenomai采取的辦法是和xnobject類似,不通過動態內存分配,內核初始化的時候就申請好_NSIG + (SIGRTMAX - SIGRTMIN) * 2
個cobalt_sigpending的內存sigpending_mem,然后將這一個個cobalt_sigpending穿到鏈表sigpending_pool。cobalt_signal_alloc()就是直接從鏈表sigpending_pool上取一個就可以,cobalt_signal_free釋放時再加入鏈表sigpending_pool。
__SIGPOOL_SIZE
大小,為在kernel\xenomai\posix\signal.c
定義如下:
#define __SIGPOOL_SIZE (sizeof(struct cobalt_sigpending) * \
(_NSIG + (SIGRTMAX - SIGRTMIN) * 2))
注意:這說明整個xenomai系統內只有84個cobalt_sigpending,這意味如果濫用xenomai信號,或隨意向其他xenomai線程發送>31號的信號,而這個線程沒有調用sigwait的操作,那么這些cobalt_sigpending會被永遠的排隊在這個線程上,直到線程A退出才會被釋放。這會導致cobalt_sigpending枯竭,進而影響其他使用信號的程序無法正常工作。
上面的cobalt_signal_alloc()是不是BUG?沒有cobalt_signal_alloc()分配失敗的處理邏輯。這樣應用無法知道是否發送成功了。設置cobalt_sigpending的信號編號si.si_signo,信號類型si.si_code,發送者是誰si.si_pid,發送者的uid多少si.si_uid。使用cobalt_signal_send進行發送。
下面看發送函數cobalt_signal_send。
int cobalt_signal_send(struct cobalt_thread *thread,
struct cobalt_sigpending *sigp,
int group)
{ /* nklocked, IRQs off */
struct list_head *sigq;
int sig, ret;
/* Can we deliver this signal immediately?*/
ret = cobalt_signal_deliver(thread, sigp, group);
if (ret)
return ret; /* Yep, done. */
......
sig = sigp->si.si_signo;
sigq = thread->sigqueues + sig - 1;
if (!list_empty(sigq)) {
/* Queue non-rt signals only once. */
if (sig < SIGRTMIN)
return 0;
/* Queue rt signal source only once (SI_TIMER). */
if (!list_empty(&sigp->next))
return 0;
}
sigaddset(&thread->sigpending, sig);
list_add_tail(&sigp->next, sigq);
return 1;
}
我們發送信號給的這個線程,有可能正在阻塞等待信號,先調用cobalt_signal_deliver看它是不是在等待,是的話直接就遞送(deliver)了;
如果不能及時遞送,再將這個信號掛起,以下兩類信號不支持排隊:1.對於小於SIGRTMIN(32)的信號,不支持排隊,只掛起一次。2.如果發送來的sigp是多個SI_TIMER信號(定時器到期信號),也只排隊一次(與linux處理方式一致,應該是posix標准)。排隊就是將這個cobalt_sigpending插入thread->sigqueues[signo-1]鏈表尾。
排隊的信號是不會被內核處理的,直到用調用sigwaitinfo、sigwait、sigtimedwait來消耗他們。
static int cobalt_signal_deliver(struct cobalt_thread *thread,
struct cobalt_sigpending *sigp,
int group)
{ /* nklocked, IRQs off */
struct cobalt_sigwait_context *swc;
struct xnthread_wait_context *wc;
int sig, ret;
sig = sigp->si.si_signo;
XENO_BUG_ON(COBALT, sig < 1 || sig > _NSIG);
if (xnsynch_pended_p(&thread->sigwait)) {
wc = xnthread_get_wait_context(&thread->threadbase);
swc = container_of(wc, struct cobalt_sigwait_context, wc);
if (sigismember(swc->set, sig))
goto deliver;
}
/*
* If that does not work out and we are sending to a thread
* group, try to deliver to any thread from the same process
* waiting for that signal.
*/
if (!group || list_empty(&thread->process->sigwaiters))
return 0;
list_for_each_entry(thread, &thread->process->sigwaiters, signext) {
wc = xnthread_get_wait_context(&thread->threadbase);
swc = container_of(wc, struct cobalt_sigwait_context, wc);
if (sigismember(swc->set, sig))
goto deliver;
}
return 0;
deliver:
cobalt_copy_siginfo(sigp->si.si_code, swc->si, &sigp->si);
.....
xnthread_complete_wait(&swc->wc);
xnsynch_wakeup_one_sleeper(&thread->sigwait); /*喚醒線程*/
list_del(&thread->signext);
cobalt_signal_free(sigp);
return 1;
}
- 先看這個線程是否正在等待信號,並且等待的信號集中包含我們發送的這個信號,就直接遞送
- 否則的話看這個信號是通過kill還是pthread_kill發送的,如果是kill(group 等於1)就在看看線程組內有沒有其他下線程等待這個信號。否則的話遞送不成功返回0,回到cobalt_signal_send將這個信號掛起排隊。
- 遞送過程很簡單,xenomai線程等待信號的時候不是阻塞在sigwait上嗎,直接喚醒它,告訴它哪個信號來了就行了,接着釋放發送時分配的cobalt_sigpending。
2.2 xenomai接收處理信號
線程調用sigwaitinfo()
、sigtimedwait()
和sigwait()
來接收信號,都是posix標准,作用與linux線程一致。同樣,編譯時會鏈接到libcobalt,再由libcobalt發起系統調用。
COBALT_IMPL(int, sigwait, (const sigset_t *set, int *sig))
{
int ret, oldtype;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
ret = -XENOMAI_SYSCALL2(sc_cobalt_sigwait, set, sig);
pthread_setcanceltype(oldtype, NULL);
return ret;
}
COBALT_IMPL(int, sigwaitinfo, (const sigset_t *set, siginfo_t *si))
{
int ret, oldtype;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
ret = XENOMAI_SYSCALL2(sc_cobalt_sigwaitinfo, set, si);
.....
pthread_setcanceltype(oldtype, NULL);
return ret;
}
COBALT_IMPL(int, sigtimedwait, (const sigset_t *set, siginfo_t *si,
const struct timespec *timeout))
{
int ret, oldtype;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
ret = XENOMAI_SYSCALL3(sc_cobalt_sigtimedwait, set, si, timeout);
......
pthread_setcanceltype(oldtype, NULL);
return ret;
}
發起系統調用進入xenomai內核后最終都是執行signal_wait()。
static int signal_wait(sigset_t *set, xnticks_t timeout,
void __user *u_si,
int (*put_siginfo)(void __user *u_si,
const struct siginfo *si,
int overrun))
{
struct cobalt_sigpending *sigp = NULL;
struct cobalt_sigwait_context swc;
struct cobalt_thread *curr;
int ret, sig, n, overrun;
unsigned long *p, *t, m;
struct siginfo si, *sip;
struct list_head *sigq;
spl_t s;
curr = cobalt_current_thread();
check:
if (sigisemptyset(&curr->sigpending))
goto wait;
p = curr->sigpending.sig; /* pending */
t = set->sig; /* tested */
for (n = 0, sig = 0; n < _NSIG_WORDS; ++n) {
m = *p++ & *t++;
if (m == 0)
continue;
sig = ffz(~m) + n *_NSIG_BPW + 1;
break;
}
if (sig) {
sigq = curr->sigqueues + sig - 1;
......
sigp = list_get_entry(sigq, struct cobalt_sigpending, next);
INIT_LIST_HEAD(&sigp->next); /* Mark sigp as unlinked. */
if (list_empty(sigq))
sigdelset(&curr->sigpending, sig);
sip = &sigp->si;
ret = 0;
goto done;
}
wait:
if (timeout == XN_NONBLOCK) {
ret = -EAGAIN;
goto fail;
}
swc.set = set;
swc.si = &si;
xnthread_prepare_wait(&swc.wc);
list_add_tail(&curr->signext, &curr->process->sigwaiters);
ret = xnsynch_sleep_on(&curr->sigwait, timeout, XN_RELATIVE);
.......
sig = si.si_signo;
sip = &si;
done:
switch (sip->si_code) {
case SI_TIMER:
overrun = cobalt_timer_deliver(sip->si_tid);
break;
case SI_USER:
case SI_MESGQ:
case SI_QUEUE:
overrun = 0;
break;
default:
overrun = sip->si_overrun;
if (overrun)
sip->si_overrun = 0;
}
if (u_si == NULL)
goto out; /* Return signo only. */
ret = put_siginfo(u_si, sip, overrun);// signal_put_siginfo
if (ret)
goto out;
......
out:
.....
if (sigp &&
(void *)sigp >= sigpending_mem &&
(void *)sigp < sigpending_mem + __SIGPOOL_SIZE) {
xnlock_get_irqsave(&nklock, s);
list_add_tail(&sigp->next, &sigpending_pool);
xnlock_put_irqrestore(&nklock, s);
/* no more ref. to sigp beyond this point. */
}
return ret ?: sig;
fail:
return ret;
}
- 先檢查curr->sigpending 是否有未決的信號,如果有的話就直接跳轉到標簽done處理。
- 否則往下進入wait標簽,睡眠到curr->sigwait。直到超時或等待的信號到來才會繼續往下執行done操作。
- done中處理一下timer超期信號,如果需要拷貝sifinfo,則調用put_siginfo拷貝一下。
- 執行out操作釋放等待到的cobalt_sigpending。
3 雙核信號總結
-
兩種信號:xenomai信號和linux信號。
-
理清雙核下的信號需要分清:Linux進程與線程、xenomai線程三者之間的關系及作用域。
-
linux的進程與線程都有信號屏蔽集,xenomai信號則沒有。
-
雙核應用代碼中使用函數發送linux信號時,最好使用
__STD()
修飾信號相關函數。 -
如果不顯式調用接口
sigwaitinfo()
、sigtimedwait()
和sigwait()
來接收信號處理xenomai信號,那么永遠不會得到處理。所以不能濫用xenomai信號,因為信號有限。 -
xenomai內核將信號作為一種同步資源(xnsynch)來管理,知道這一點,並理解了13.2 資源同步對象—xnsynch工作原理,就知道xenomai信號如何工作。
本文為本文為博主原創文章,轉載請注明出處。如有錯誤,歡迎指正。博客地址:https://www.cnblogs.com/wsg1100/