-
線程中通信
- 在裸機編程中,經常會使用全局變量進行功能間的通信,如某些功能可能由於一些操作而改變全局變量的值,另一個功能對此全局變量進行讀取,根據讀取到的全局變量值執行相應的動作,達到通信協作的目的;
郵箱
- 郵箱服務是實時操作系統中一種典型的線程間通信方法。舉一個簡單的例子,有兩個線程,線程 1 檢測按鍵狀態並發送,線程 2 讀取按鍵狀態並根據按鍵的狀態相應地改變 LED 的亮滅。這里就可以使用郵箱的方式進行通信,線程 1 將按鍵的狀態作為郵件發送到郵箱,線程 2 在郵箱中讀取郵件獲得按鍵狀態並對 LED 執行亮滅操作。
郵箱的工作機制
- RT-Thread 操作系統的郵箱用於線程間通信,特點是開銷比較低,效率較高;
- 郵箱中的每一封郵件只能容納固定的 4 字節內容(針對 32 位處理系統,指針的大小即為 4 個字節,所以一封郵件恰好能夠容納一個指針)。典型的郵箱也稱作交換消息,如下圖所示,線程或中斷服務例程把一封 4 字節長度的郵件發送到郵箱中,而一個或多個線程可以從郵箱中接收這些郵件並進行處理。
- 非阻塞方式的郵件發送過程能夠安全的應用於中斷服務中,是線程、中斷服務、定時器向線程發送消息的有效手段。
- 通常來說,郵件收取過程可能是阻塞的,這取決於郵箱中是否有郵件,以及收取郵件時設置的超時時間。當郵箱中不存在郵件且超時時間不為 0 時,郵件收取過程將變成阻塞方式。在這類情況下,只能由線程進行郵件的收取。
-
當一個線程向郵箱發送郵件時,如果郵箱沒滿,將把郵件復制到郵箱中。如果郵箱已經滿了,發送線程可以設置超時時間,選擇等待掛起或直接返回 - RT_EFULL。如果發送線程選擇掛起等待,那么當郵箱中的郵件被收取而空出空間來時,等待掛起的發送線程將被喚醒繼續發送。
-
當一個線程從郵箱中接收郵件時,如果郵箱是空的,接收線程可以選擇是否等待掛起直到收到新的郵件而喚醒,或可以設置超時時間。當達到設置的超時時間,郵箱依然未收到郵件時,這個選擇超時等待的線程將被喚醒並返回 - RT_ETIMEOUT。如果郵箱中存在郵件,那么接收線程將復制郵箱中的 4 個字節郵件到接收緩存中。
郵箱控制塊
- 在 RT-Thread 中,郵箱控制塊是操作系統用於管理郵箱的一個數據結構,由結構體 struct rt_mailbox 表示。rt_mailbox 對象從 rt_ipc_object 中派生,由 IPC 容器所管理,郵箱控制塊結構的詳細定義請見以下代碼:
-
struct rt_mailbox { struct rt_ipc_object parent; /**< inherit from ipc_object */ rt_uint32_t *msg_pool; /**< start address of message buffer */ rt_uint16_t size; /**< size of message pool */ rt_uint16_t entry; /**< index of messages in msg_pool */ rt_uint16_t in_offset; /**< input offset of the message buffer */ rt_uint16_t out_offset; /**< output offset of the message buffer */ rt_list_t suspend_sender_thread; /**< sender thread suspended on this mailbox */ }; typedef struct rt_mailbox *rt_mailbox_t;
郵箱的管理方式
- 郵箱控制塊是一個結構體,其中含有事件相關的重要參數,在郵箱的功能實現中起重要的作用。郵對一個郵箱的操作包含:創建 / 初始化郵箱、發送郵件、接收郵件、刪除 / 脫離郵箱;
創建和刪除郵箱
- 動態創建一個郵箱對象可以調用如下的函數接口:
-
/** * This function will create a mailbox object from system resource * * @param name the name of mailbox * @param size the size of mailbox * @param flag the flag of mailbox * * @return the created mailbox, RT_NULL on error happen */ rt_mailbox_t rt_mb_create(const char *name, rt_size_t size, rt_uint8_t flag)
-
創建郵箱對象時會先從對象管理器中分配一個郵箱對象,然后給郵箱動態分配一塊內存空間用來存放郵件,這塊內存的大小等於郵件大小(4 字節)與郵箱容量的乘積,接着初始化接收郵件數目和發送郵件在郵箱中的偏移量;
- 函數入口參數flag:
-
#define RT_IPC_FLAG_FIFO 0x00 /**< FIFOed IPC. @ref IPC. */ #define RT_IPC_FLAG_PRIO 0x01 /**< PRIOed IPC. @ref IPC. */
- 當用 rt_mb_create() 創建的郵箱不再被使用時,應該刪除它來釋放相應的系統資源,一旦操作完成,郵箱將被永久性的刪除;
- 刪除郵箱時,如果有線程被掛起在該郵箱對象上,內核先喚醒掛起在該郵箱上的所有線程(線程返回值是 - RT_ERROR),然后再釋放郵箱使用的內存,最后刪除郵箱對象。
-
/** * This function will delete a mailbox object and release the memory * * @param mb the mailbox object * * @return the error code */ rt_err_t rt_mb_delete(rt_mailbox_t mb)
初始化和脫離郵箱
- 初始化郵箱跟創建郵箱類似,只是初始化郵箱用於靜態郵箱對象的初始化。與創建郵箱不同的是,靜態郵箱對象的內存是在系統編譯時由編譯器分配的,一般放於讀寫數據段或未初始化數據段中,其余的初始化工作與創建郵箱時相同。
- 如果 msgpool 指向的緩沖區的字節數是 N,那么郵箱容量應該是 N/4。
-
/** * This function will initialize a mailbox and put it under control of resource * management. * * @param mb the mailbox object * @param name the name of mailbox * @param msgpool the begin address of buffer to save received mail * @param size the size of mailbox * @param flag the flag of mailbox * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mb_init(rt_mailbox_t mb, const char *name, void *msgpool, rt_size_t size, rt_uint8_t flag)
-
脫離郵箱將把靜態初始化的郵箱對象從內核對象管理器中脫離;
- 使用該函數接口后,內核先喚醒所有掛在該郵箱上的線程(線程獲得返回值是 - RT_ERROR),然后將該郵箱對象從內核對象管理器中脫離。
-
/** * This function will detach a mailbox from resource management * * @param mb the mailbox object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mb_detach(rt_mailbox_t mb)
發送郵件
- 線程或者中斷服務程序可以通過郵箱給其他線程發送郵件;
- 發送的郵件可以是 32 位任意格式的數據,一個整型值或者一個指向緩沖區的指針。當郵箱中的郵件已經滿時,發送郵件的線程或者中斷程序會收到 -RT_EFULL 的返回值;
-
/** * This function will send a mail to mailbox object, if there are threads * suspended on mailbox object, it will be waked up. This function will return * immediately, if you want blocking send, use rt_mb_send_wait instead. * * @param mb the mailbox object * @param value the mail * * @return the error code */ rt_err_t rt_mb_send(rt_mailbox_t mb, rt_uint32_t value)
等待方式發送郵件
- 用戶也可以通過如下的函數接口向指定郵箱發送郵件;
- rt_mb_send_wait() 與 rt_mb_send() 的區別在於有等待時間,如果郵箱已經滿了,那么發送線程將根據設定的 timeout 參數等待郵箱中因為收取郵件而空出空間。如果設置的超時時間到達依然沒有空出空間,這時發送線程將被喚醒並返回錯誤碼;
-
/** * This function will send a mail to mailbox object. If the mailbox is full, * current thread will be suspended until timeout. * * @param mb the mailbox object * @param value the mail * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mb_send_wait(rt_mailbox_t mb, rt_uint32_t value, rt_int32_t timeout)
接收郵件
- 只有當接收者接收的郵箱中有郵件時,接收者才能立即取到郵件並返回 RT_EOK 的返回值,否則接收線程會根據超時時間設置,或掛起在郵箱的等待線程隊列上,或直接返回;
- 接收郵件時,接收者需指定接收郵件的郵箱句柄,並指定接收到的郵件存放位置以及最多能夠等待的超時時間。如果接收時設定了超時,當指定的時間內依然未收到郵件時,將返回 - RT_ETIMEOUT;
-
/** * This function will receive a mail from mailbox object, if there is no mail * in mailbox object, the thread shall wait for a specified time. * * @param mb the mailbox object * @param value the received mail will be saved in * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mb_recv(rt_mailbox_t mb, rt_uint32_t *value, rt_int32_t timeout)
郵箱使用示例
- 初始化 2 個靜態線程,一個靜態的郵箱對象,其中一個線程往郵箱中發送郵件,一個線程往郵箱中收取郵件。
-
#include <rtthread.h> #define THREAD_PRIORITY 10 #define THREAD_TIMESLICE 5 /* 郵箱控制塊 */ static struct rt_mailbox mb; /* 用於放郵件的內存池 */ static char mb_pool[128]; static char mb_str1[] = "I'm a mail!"; static char mb_str2[] = "this is another mail!"; static char mb_str3[] = "over"; ALIGN(RT_ALIGN_SIZE) static char thread1_stack[1024]; static struct rt_thread thread1; /* 線程 1 入口 */ static void thread1_entry(void *parameter) { char *str; while (1) { rt_kprintf("thread1: try to recv a mail\n"); /* 從郵箱中收取郵件 */ if (rt_mb_recv(&mb, (rt_uint32_t *)&str, RT_WAITING_FOREVER) == RT_EOK) { rt_kprintf("thread1: get a mail from mailbox, the content:%s\n", str); if (str == mb_str3) break; /* 延時 100ms */ rt_thread_mdelay(100); } } /* 執行郵箱對象脫離 */ rt_mb_detach(&mb); } ALIGN(RT_ALIGN_SIZE) static char thread2_stack[1024]; static struct rt_thread thread2; /* 線程 2 入口 */ static void thread2_entry(void *parameter) { rt_uint8_t count; count = 0; while (count < 10) { count ++; if (count & 0x1) { /* 發送 mb_str1 地址到郵箱中 */ rt_mb_send(&mb, (rt_uint32_t)&mb_str1); } else { /* 發送 mb_str2 地址到郵箱中 */ rt_mb_send(&mb, (rt_uint32_t)&mb_str2); } /* 延時 200ms */ rt_thread_mdelay(200); } /* 發送郵件告訴線程 1,線程 2 已經運行結束 */ rt_mb_send(&mb, (rt_uint32_t)&mb_str3); } int mailbox_sample(void) { rt_err_t result; /* 初始化一個 mailbox */ result = rt_mb_init(&mb, "mbt", /* 名稱是 mbt */ &mb_pool[0], /* 郵箱用到的內存池是 mb_pool */ sizeof(mb_pool) / 4, /* 郵箱中的郵件數目,因為一封郵件占 4 字節 */ RT_IPC_FLAG_FIFO); /* 采用 FIFO 方式進行線程等待 */ if (result != RT_EOK) { rt_kprintf("init mailbox failed.\n"); return -1; } rt_thread_init(&thread1, "thread1", thread1_entry, RT_NULL, &thread1_stack[0], sizeof(thread1_stack), THREAD_PRIORITY, THREAD_TIMESLICE); rt_thread_startup(&thread1); rt_thread_init(&thread2, "thread2", thread2_entry, RT_NULL, &thread2_stack[0], sizeof(thread2_stack), THREAD_PRIORITY, THREAD_TIMESLICE); rt_thread_startup(&thread2); return 0; } /* 導出到 msh 命令列表中 */ MSH_CMD_EXPORT(mailbox_sample, mailbox sample);
-
運行結果:
-
\ | / - RT - Thread Operating System / | \ 3.1.0 build Aug 27 2018 2006 - 2018 Copyright by rt-thread team msh >mailbox_sample thread1: try to recv a mail thread1: get a mail from mailbox, the content:I'm a mail! msh >thread1: try to recv a mail thread1: get a mail from mailbox, the content:this is another mail! … thread1: try to recv a mail thread1: get a mail from mailbox, the content:this is another mail! thread1: try to recv a mail thread1: get a mail from mailbox, the content:over
郵箱的使用場合
- 郵箱是一種簡單的線程間消息傳遞方式,特點是開銷比較低,效率較高。在 RT-Thread 操作系統的實現中能夠一次傳遞一個 4 字節大小的郵件,並且郵箱具備一定的存儲功能,能夠緩存一定數量的郵件數 (郵件數由創建、初始化郵箱時指定的容量決定)。郵箱中一封郵件的最大長度是 4 字節,所以郵箱能夠用於不超過 4 字節的消息傳遞。由於在 32 系統上 4 字節的內容恰好可以放置一個指針,因此當需要在線程間傳遞比較大的消息時,可以把指向一個緩沖區的指針作為郵件發送到郵箱中,即郵箱也可以傳遞指針,例如:
-
struct msg { rt_uint8_t *data_ptr; rt_uint32_t data_size; };
-
對於這樣一個消息結構體,其中包含了指向數據的指針 data_ptr 和數據塊長度的變量 data_size。當一個線程需要把這個消息發送給另外一個線程時,可以采用如下的操作:
-
struct msg* msg_ptr; msg_ptr = (struct msg*)rt_malloc(sizeof(struct msg)); msg_ptr->data_ptr = ...; /* 指向相應的數據塊地址 */ msg_ptr->data_size = len; /* 數據塊的長度 */ /* 發送這個消息指針給 mb 郵箱 */ rt_mb_send(mb, (rt_uint32_t)msg_ptr);
- 而在接收線程中,因為收取過來的是指針,而 msg_ptr 是一個新分配出來的內存塊,所以在接收線程處理完畢后,需要釋放相應的內存塊:
-
struct msg* msg_ptr; if (rt_mb_recv(mb, (rt_uint32_t*)&msg_ptr) == RT_EOK) { /* 在接收線程處理完畢后,需要釋放相應的內存塊 */ rt_free(msg_ptr); }
消息隊列
- 消息隊列是另一種常用的線程間通訊方式,是郵箱的擴展。可以應用在多種場合:線程間的消息交換、使用串口接收不定長數據等。
消息隊列的工作機制
- 消息隊列能夠接收來自線程或中斷服務例程中不固定長度的消息,並把消息緩存在自己的內存空間中;
- 其他線程也能夠從消息隊列中讀取相應的消息,而當消息隊列是空的時候,可以掛起讀取線程。當有新的消息到達時,掛起的線程將被喚醒以接收並處理消息。消息隊列是一種異步的通信方式。
- 線程或中斷服務例程可以將一條或多條消息放入消息隊列中。同樣,一個或多個線程也可以從消息隊列中獲得消息。當有多個消息發送到消息隊列時,通常將先進入消息隊列的消息先傳給線程,也就是說,線程先得到的是最先進入消息隊列的消息,即先進先出原則 (FIFO),如下圖所示:
- RT-Thread 操作系統的消息隊列對象由多個元素組成,當消息隊列被創建時,它就被分配了消息隊列控制塊:消息隊列名稱、內存緩沖區、消息大小以及隊列長度等。
- 每個消息隊列對象中包含着多個消息框,每個消息框可以存放一條消息;消息隊列中的第一個和最后一個消息框被分別稱為消息鏈表頭和消息鏈表尾,對應於消息隊列控制塊中的 msg_queue_head 和 msg_queue_tail;
- 有些消息框可能是空的,它們通過 msg_queue_free 形成一個空閑消息框鏈表。所有消息隊列中的消息框總數即是消息隊列的長度,這個長度可在消息隊列創建時指定。
消息隊列控制塊
- 在 RT-Thread 中,消息隊列控制塊是操作系統用於管理消息隊列的一個數據結構,由結構體 struct rt_messagequeue 表示;
-
/** * message queue structure */ struct rt_messagequeue { struct rt_ipc_object parent; /**< inherit from ipc_object */ void *msg_pool; /**< start address of message queue */ rt_uint16_t msg_size; /**< message size of each message */ rt_uint16_t max_msgs; /**< max number of messages */ rt_uint16_t entry; /**< index of messages in the queue */ void *msg_queue_head; /**< list head */ void *msg_queue_tail; /**< list tail */ void *msg_queue_free; /**< pointer indicated the free node of queue */ }; typedef struct rt_messagequeue *rt_mq_t;
-
消息隊列控制塊是一個結構體,其中含有消息隊列相關的重要參數,對一個消息隊列的操作包含:創建消息隊列 - 發送消息 - 接收消息 - 刪除消息隊列。
消息隊列的管理方式
創建和刪除消息隊列
- 消息隊列在使用前,應該先創建出來,創建消息隊列時先從對象管理器中分配一個消息隊列對象,然后給消息隊列對象分配一塊內存空間,組織成空閑消息鏈表,這塊內存的大小 =[消息大小 + 消息頭(用於鏈表連接)的大小]X 消息隊列最大個數,接着再初始化消息隊列,此時消息隊列為空。
-
/** * This function will create a message queue object from system resource * * @param name the name of message queue * @param msg_size the size of message * @param max_msgs the maximum number of message in queue * @param flag the flag of message queue * * @return the created message queue, RT_NULL on error happen */ rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag)
-
當消息隊列不再被使用時,應該刪除它以釋放系統資源,一旦操作完成,消息隊列將被永久性地刪除;
-
/** * This function will delete a message queue object and release the memory * * @param mq the message queue object * * @return the error code */ rt_err_t rt_mq_delete(rt_mq_t mq)
初始化和脫離消息隊列
- 初始化靜態消息隊列對象跟創建消息隊列對象類似,只是靜態消息隊列對象的內存是在系統編譯時由編譯器分配的,一般放於讀數據段或未初始化數據段中。
-
/** * This function will initialize a message queue and put it under control of * resource management. * * @param mq the message object * @param name the name of message queue * @param msgpool the beginning address of buffer to save messages * @param msg_size the maximum size of message * @param pool_size the size of buffer to save messages * @param flag the flag of message queue * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_init(rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_size, rt_size_t pool_size, rt_uint8_t flag)
-
脫離消息隊列將使消息隊列對象被從內核對象管理器中脫離,使用該函數接口后,內核先喚醒所有掛在該消息等待隊列對象上的線程(線程返回值是 -RT_ERROR),然后將該消息隊列對象從內核對象管理器中脫離;
-
/** * This function will detach a message queue object from resource management * * @param mq the message queue object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_detach(rt_mq_t mq)
發送消息
- 線程或者中斷服務程序都可以給消息隊列發送消息。
- 當發送消息時,消息隊列對象先從空閑消息鏈表上取下一個空閑消息塊,把線程或者中斷服務程序發送的消息內容復制到消息塊上,然后把該消息塊掛到消息隊列的尾部;
- 當且僅當空閑消息鏈表上有可用的空閑消息塊時,發送者才能成功發送消息;當空閑消息鏈表上無可用消息塊,說明消息隊列已滿,此時,發送消息的的線程或者中斷程序會收到一個錯誤碼(-RT_EFULL);
-
/** * This function will send a message to message queue object, if there are * threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size)
發送緊急消息
- 發送緊急消息的過程與發送消息幾乎一樣,唯一的不同是,當發送緊急消息時,從空閑消息鏈表上取下來的消息塊不是掛到消息隊列的隊尾,而是掛到隊首,這樣,接收者就能夠優先接收到緊急消息,從而及時進行消息處理。
-
/** * This function will send an urgent message to message queue object, which * means the message will be inserted to the head of message queue. If there * are threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_urgent(rt_mq_t mq, void *buffer, rt_size_t size)
- 當消息隊列中有消息時,接收者才能接收消息,否則接收者會根據超時時間設置,或掛起在消息隊列的等待線程隊列上,或直接返回;
-
/** * This function will receive a message from message queue object, if there is * no message in message queue object, the thread shall wait for a specified * time. * * @param mq the message queue object * @param buffer the received message will be saved in * @param size the size of buffer * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout)
消息隊列使用示例
- 初始化了 2 個靜態線程,一個線程會從消息隊列中收取消息;另一個線程會定時給消息隊列發送普通消息和緊急消息
-
#include <rtthread.h> /* 消息隊列控制塊 */ static struct rt_messagequeue mq; /* 消息隊列中用到的放置消息的內存池 */ static rt_uint8_t msg_pool[2048]; ALIGN(RT_ALIGN_SIZE) static char thread1_stack[1024]; static struct rt_thread thread1; /* 線程 1 入口函數 */ static void thread1_entry(void *parameter) { char buf = 0; rt_uint8_t cnt = 0; while (1) { /* 從消息隊列中接收消息 */ if (rt_mq_recv(&mq, &buf, sizeof(buf), RT_WAITING_FOREVER) == RT_EOK) { rt_kprintf("thread1: recv msg from msg queue, the content:%c\n", buf); if (cnt == 19) { break; } } /* 延時 50ms */ cnt++; rt_thread_mdelay(50); } rt_kprintf("thread1: detach mq \n"); rt_mq_detach(&mq); } ALIGN(RT_ALIGN_SIZE) static char thread2_stack[1024]; static struct rt_thread thread2; /* 線程 2 入口 */ static void thread2_entry(void *parameter) { int result; char buf = 'A'; rt_uint8_t cnt = 0; while (1) { if (cnt == 8) { /* 發送緊急消息到消息隊列中 */ result = rt_mq_urgent(&mq, &buf, 1); if (result != RT_EOK) { rt_kprintf("rt_mq_urgent ERR\n"); } else { rt_kprintf("thread2: send urgent message - %c\n", buf); } } else if (cnt>= 20)/* 發送 20 次消息之后退出 */ { rt_kprintf("message queue stop send, thread2 quit\n"); break; } else { /* 發送消息到消息隊列中 */ result = rt_mq_send(&mq, &buf, 1); if (result != RT_EOK) { rt_kprintf("rt_mq_send ERR\n"); } rt_kprintf("thread2: send message - %c\n", buf); } buf++; cnt++; /* 延時 5ms */ rt_thread_mdelay(5); } } /* 消息隊列示例的初始化 */ int msgq_sample(void) { rt_err_t result; /* 初始化消息隊列 */ result = rt_mq_init(&mq, "mqt", &msg_pool[0], /* 內存池指向 msg_pool */ 1, /* 每個消息的大小是 1 字節 */ sizeof(msg_pool), /* 內存池的大小是 msg_pool 的大小 */ RT_IPC_FLAG_FIFO); /* 如果有多個線程等待,按照先來先得到的方法分配消息 */ if (result != RT_EOK) { rt_kprintf("init message queue failed.\n"); return -1; } rt_thread_init(&thread1, "thread1", thread1_entry, RT_NULL, &thread1_stack[0], sizeof(thread1_stack), 25, 5); rt_thread_startup(&thread1); rt_thread_init(&thread2, "thread2", thread2_entry, RT_NULL, &thread2_stack[0], sizeof(thread2_stack), 25, 5); rt_thread_startup(&thread2); return 0; } /* 導出到 msh 命令列表中 */ MSH_CMD_EXPORT(msgq_sample, msgq sample);
運行結果:
-
\ | / - RT - Thread Operating System / | \ 3.1.0 build Aug 24 2018 2006 - 2018 Copyright by rt-thread team msh > msgq_sample msh >thread2: send message - A thread1: recv msg from msg queue, the content:A thread2: send message - B thread2: send message - C thread2: send message - D thread2: send message - E thread1: recv msg from msg queue, the content:B thread2: send message - F thread2: send message - G thread2: send message - H thread2: send urgent message - I thread2: send message - J thread1: recv msg from msg queue, the content:I thread2: send message - K thread2: send message - L thread2: send message - M thread2: send message - N thread2: send message - O thread1: recv msg from msg queue, the content:C thread2: send message - P thread2: send message - Q thread2: send message - R thread2: send message - S thread2: send message - T thread1: recv msg from msg queue, the content:D message queue stop send, thread2 quit thread1: recv msg from msg queue, the content:E thread1: recv msg from msg queue, the content:F thread1: recv msg from msg queue, the content:G … thread1: recv msg from msg queue, the content:T thread1: detach mq
-
線程 1 會從消息隊列中收取消息;線程 2 定時給消息隊列發送普通消息和緊急消息。由於線程 2 發送消息 “I” 是緊急消息,會直接插入消息隊列的隊首,所以線程 1 在接收到消息 “B” 后,接收的是該緊急消息,之后才接收消息“C”。
消息隊列的使用場合
- 消息隊列可以應用於發送不定長消息的場合,包括線程與線程間的消息交換,以及中斷服務例程中給線程發送消息(中斷服務例程不能接收消息);
發送消息
- 消息隊列和郵箱的明顯不同是消息的長度並不限定在 4 個字節以內;另外,消息隊列也包括了一個發送緊急消息的函數接口;
- 但是當創建的是一個所有消息的最大長度是 4 字節的消息隊列時,消息隊列對象將蛻化成郵箱。這個不限定長度的消息,也及時的反應到了代碼編寫的場合上,同樣是類似郵箱的代碼:
- 和郵箱例子相同的消息結構定義,假設依然需要發送這樣一個消息給接收線程。在郵箱例子中,這個結構只能夠發送指向這個結構的指針
-
struct msg { rt_uint8_t *data_ptr; /* 數據塊首地址 */ rt_uint32_t data_size; /* 數據塊大小 */ };
void send_op(void *data, rt_size_t length) { struct msg msg_ptr; msg_ptr.data_ptr = data; /* 指向相應的數據塊地址 */ msg_ptr.data_size = length; /* 數據塊的長度 */ /* 發送這個消息指針給 mq 消息隊列 */ rt_mq_send(mq, (void*)&msg_ptr, sizeof(struct msg)); }
-
注意,上面的代碼中,是把一個局部變量的數據內容發送到了消息隊列中。在接收線程中,同樣也采用局部變量進行消息接收的結構體:
-
void message_handler() { struct msg msg_ptr; /* 用於放置消息的局部變量 */ /* 從消息隊列中接收消息到 msg_ptr 中 */ if (rt_mq_recv(mq, (void*)&msg_ptr, sizeof(struct msg)) == RT_EOK) { /* 成功接收到消息,進行相應的數據處理 */ } }
同步消息
- 在一般的系統設計中會經常遇到要發送同步消息的問題,這個時候就可以根據當時狀態的不同選擇相應的實現:兩個線程間可以采用[消息隊列 + 信號量或郵箱]的形式實現。發送線程通過消息發送的形式發送相應的消息給消息隊列,發送完畢后希望獲得接收線程的收到確認,工作示意圖如下圖所示:
- 根據消息的不同可以把消息定義成結構體
-
// 使用郵箱作為確認標志,表示接收線程能夠通知一些狀態值給發送線程 struct msg { /* 消息結構其他成員 */ struct rt_mailbox ack; };
// 使用信號量作為確認標志,只能夠單一的通知發送線程,消息已經確認接收。 struct msg { /* 消息結構其他成員 */ struct rt_semaphore ack; };
信號
- 信號(又稱為軟中斷信號),在軟件層次上是對中斷機制的一種模擬,在原理上,一個線程收到一個信號與處理器收到一個中斷請求可以說是類似的。
信號的工作機制
- 信號在 RT-Thread 中用作異步通信,POSIX 標准定義了 sigset_t 類型來定義一個信號集,然而 sigset_t 類型在不同的系統可能有不同的定義方式,在 RT-Thread 中,將 sigset_t 定義成了 unsigned long 型,並命名為 rt_sigset_t,應用程序能夠使用的信號為 SIGUSR1(10)和 SIGUSR2(12)。
- 信號本質是軟中斷,用來通知線程發生了異步事件,用做線程之間的異常通知、應急處理。一個線程不必通過任何操作來等待信號的到達,事實上,線程也不知道信號到底什么時候到達,線程之間可以互相通過調用 rt_thread_kill() 發送軟中斷信號。
- 收到信號的線程對各種信號有不同的處理方法,處理方法可以分為三類:
-
- 類似中斷的處理程序,對於需要處理的信號,線程可以指定處理函數,由該函數來處理。
- 忽略某個信號,對該信號不做任何處理,就像未發生過一樣。
- 對該信號的處理保留系統的默認值。
- 假設線程 1 需要對信號進行處理,首先線程 1 安裝一個信號並解除阻塞,並在安裝的同時設定了對信號的異常處理方式;然后其他線程可以給線程 1 發送信號,觸發線程 1 對該信號的處理。
- 當信號被傳遞給線程 1 時,如果它正處於掛起狀態,那會把狀態改為就緒狀態去處理對應的信號。如果它正處於運行狀態,那么會在它當前的線程棧基礎上建立新棧幀空間去處理對應的信號,需要注意的是使用的線程棧大小也會相應增加。
信號的管理方式
- 對於信號的操作,有以下幾種:安裝信號、阻塞信號、阻塞解除、信號發送、信號等待。信號的接口詳見下圖:
安裝信號
-
線程要處理某一信號,那么就要在線程中安裝該信號。安裝信號主要用來確定信號值及線程針對該信號值的動作之間的映射關系,即線程將要處理哪個信號,該信號被傳遞給線程時,將執行何種操作。
-
/* * @param signo:信號值(只有 SIGUSR1 和 SIGUSR2 是開放給用戶使用的,下同) * @param handler:設置對信號值的處理方式 * @return: 錯誤的信號:SIG_ERR * 成 功:安裝信號前的 handler 值 */ rt_sighandler_t rt_signal_install(int signo, rt_sighandler_t handler)
-
在信號安裝時設定 handler 參數,決定了該信號的不同的處理方法。處理方法可以分為三種:
-
-
類似中斷的處理方式,參數指向當信號發生時用戶自定義的處理函數,由該函數來處理。
-
參數設為 SIG_IGN,忽略某個信號,對該信號不做任何處理,就像未發生過一樣。
-
參數設為 SIG_DFL,系統會調用默認的處理函數_signal_default_handler()。
-
阻塞信號
- 信號阻塞,也可以理解為屏蔽信號。如果該信號被阻塞,則該信號將不會遞達給安裝此信號的線程,也不會引發軟中斷處理。調 rt_signal_mask() 可以使信號阻塞:
-
/* * @param signo:信號值(只有 SIGUSR1 和 SIGUSR2 是開放給用戶使用的,下同) */ void rt_signal_mask(int signo)
解除信號阻塞
- 線程中可以安裝好幾個信號,使用此函數可以對其中一些信號給予 “關注”,那么發送這些信號都會引發該線程的軟中斷。調用 rt_signal_unmask() 可以用來解除信號阻塞:
-
/* * @param signo:信號值(只有 SIGUSR1 和 SIGUSR2 是開放給用戶使用的,下同) */ void rt_signal_unmask(int signo)
發送信號
- 當需要進行異常處理時,可以給設定了處理異常的線程發送信號,調用 rt_thread_kill() 可以用來向任何線程發送信號:
-
/* * @param tid:接收信號的線程 * @param sig:信號值 * return:失敗:-RT_EINVAL * 成功:RT_EOK */ int rt_thread_kill(rt_thread_t tid, int sig)
等待信號
-
等待 set 信號的到來,如果沒有等到這個信號,則將線程掛起,直到等到這個信號或者等待時間超過指定的超時時間 timeout。如果等到了該信號,則將指向該信號體的指針存入 si;
-
/* * @param set:指定等待的信號 * @param si: 指向存儲等到信號信息的指針 * @param timeout:指定的等待時間 * return: RT_EOK 等到信號 * -RT_ETIMEOUT 超時 * -RT_EINVAL 參數錯誤 */ int rt_signal_wait(const rt_sigset_t *set, rt_siginfo_t *si, rt_int32_t timeout)
信號應用示例
- 創建了 1 個線程,在安裝信號時,信號處理方式設為自定義處理,定義的信號的處理函數為 thread1_signal_handler()。待此線程運行起來安裝好信號之后,給此線程發送信號。此線程將接收到信號,並打印信息;
-
#include <rtthread.h> #define THREAD_PRIORITY 25 #define THREAD_STACK_SIZE 512 #define THREAD_TIMESLICE 5 static rt_thread_t tid1 = RT_NULL; /* 線程 1 的信號處理函數 */ void thread1_signal_handler(int sig) { rt_kprintf("thread1 received signal %d\n", sig); } /* 線程 1 的入口函數 */ static void thread1_entry(void *parameter) { int cnt = 0; /* 安裝信號 */ rt_signal_install(SIGUSR1, thread1_signal_handler); rt_signal_unmask(SIGUSR1); /* 運行 10 次 */ while (cnt < 10) { /* 線程 1 采用低優先級運行,一直打印計數值 */ rt_kprintf("thread1 count : %d\n", cnt); cnt++; rt_thread_mdelay(100); } } /* 信號示例的初始化 */ int signal_sample(void) { /* 創建線程 1 */ tid1 = rt_thread_create("thread1", thread1_entry, RT_NULL, THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE); if (tid1 != RT_NULL) rt_thread_startup(tid1); rt_thread_mdelay(300); /* 發送信號 SIGUSR1 給線程 1 */ rt_thread_kill(tid1, SIGUSR1); return 0; } /* 導出到 msh 命令列表中 */ MSH_CMD_EXPORT(signal_sample, signal sample);
-
運行結果:
-
\ | / - RT - Thread Operating System / | \ 3.1.0 build Aug 24 2018 2006 - 2018 Copyright by rt-thread team msh >signal_sample thread1 count : 0 thread1 count : 1 thread1 count : 2 msh >thread1 received signal 10 thread1 count : 3 thread1 count : 4 thread1 count : 5 thread1 count : 6 thread1 count : 7 thread1 count : 8 thread1 count : 9
參考
- 《RT-Thread 編程指南》