RT-Thread--線程間同步


線程間同步

  • 一個線程從傳感器中接收數據並且將數據寫到共享內存中,同時另一個線程周期性的從共享內存中讀取數據並發送去顯示,下圖描述了兩個線程間的數據傳遞:

  • 如果對共享內存的訪問不是排他性的,那么各個線程間可能同時訪問它,這將引起數據一致性的問題。例如,在顯示線程試圖顯示數據之前,接收線程還未完成數據的寫入,那么顯示將包含不同時間采樣的數據,造成顯示數據的錯亂。
  • 將傳感器數據寫入到共享內存塊的接收線程 #1 和將傳感器數據從共享內存塊中讀出的線程 #2 都會訪問同一塊內存。為了防止出現數據的差錯,兩個線程訪問的動作必須是互斥進行的,應該是在一個線程對共享內存塊操作完成后,才允許另一個線程去操作,這樣,接收線程 #1 與顯示線程 #2 才能正常配合,使此項工作正確地執行。
  • 同步是指按預定的先后次序進行運行,線程同步是指多個線程通過特定的機制(如互斥量,事件對象,臨界區)來控制線程之間的執行順序,也可以說是在線程之間通過同步建立起執行順序的關系,如果沒有同步,那線程之間將是無序的。
  • 多個線程操作 / 訪問同一塊區域(代碼),這塊代碼就稱為臨界區;
  • 線程的同步方式有很多種,其核心思想都是:在訪問臨界區的時候只允許一個 (或一類) 線程運行。

信號量(semaphore)

信號量工作機制

  • 信號量是一種輕型的用於解決線程間同步問題的內核對象,線程可以獲取或釋放它,從而達到同步或互斥的目的。
  • 每個信號量對象都有一個信號量值和一個線程等待隊列,信號量的值對應了信號量對象的實例數目、資源數目,假如信號量值為 5,則表示共有 5 個信號量實例(資源)可以被使用,當信號量實例數目為零時,再申請該信號量的線程就會被掛起在該信號量的等待隊列上,等待可用的信號量實例(資源)。
  • 信號量工作示意圖如下圖所示:

信號量控制塊

  • 信號量控制塊是操作系統用於管理信號量的一個數據結構,由結構體 struct rt_semaphore 表示,定義如下
  • /**
     * Semaphore structure
     */
    struct rt_semaphore
    {
        struct rt_ipc_object parent;                        /**< inherit from ipc_object */
    
        rt_uint16_t          value;                         /**< value of semaphore. */
    };
    typedef struct rt_semaphore *rt_sem_t;
  • rt_semaphore 對象從 rt_ipc_object 中派生,由 IPC 容器所管理,信號量的最大值是 65535。

信號量管理方式

  • 信號量控制塊中含有信號量相關的重要參數,在信號量各種狀態間起到紐帶的作用。對一個信號量的操作包含:創建 / 初始化信號量、獲取信號量、釋放信號量、刪除 / 脫離信號量。

 

 創建和刪除信號量

  • 當創建一個信號量時,內核首先創建一個信號量控制塊,然后對該控制塊進行基本的初始化工作;
  • /**
     * This function will create a semaphore from system resource
     *
     * @param name the name of semaphore
     * @param value the init value of semaphore
     * @param flag the flag of semaphore
     *
     * @return the created semaphore, RT_NULL on error happen
     *
     * @see rt_sem_init
     */
    rt_sem_t rt_sem_create(const char *name, rt_uint32_t value, rt_uint8_t flag)
  • 當調用這個函數時,系統將先從對象管理器中分配一個 semaphore 對象,並初始化這個對象,然后初始化父類 IPC 對象以及與 semaphore 相關的部分。在創建信號量指定的參數中,信號量標志參數決定了當信號量不可用時,多個線程等待的排隊方式。
  • 當選擇 RT_IPC_FLAG_FIFO(先進先出)方式時,那么等待線程隊列將按照先進先出的方式排隊,先進入的線程將先獲得等待的信號量;
  • 當選擇 RT_IPC_FLAG_PRIO(優先級等待)方式時,等待線程隊列將按照優先級進行排隊,優先級高的等待線程將先獲得等待的信號量。
  • 系統不再使用信號量時,可通過刪除信號量以釋放系統資源,適用於動態創建的信號量。
  • /**
     * This function will delete a semaphore object and release the memory
     *
     * @param sem the semaphore object
     *
     * @return the error code : RT_EOK
     *
     * @see rt_sem_detach 
     */
    rt_err_t rt_sem_delete(rt_sem_t sem)

    初始化和脫離信號量

  • 對於靜態信號量對象,它的內存空間在編譯時期就被編譯器分配出來,放在讀寫數據段或未初始化數據段上,此時使用信號量就不再需要使用 rt_sem_create 接口來創建它,而只需在使用前對它進行初始化即可;
  • 當調用這個函數時,系統將對這個 semaphore 對象進行初始化,然后初始化 IPC 對象以及與 semaphore 相關的部分。信號量標志可用上面創建信號量函數里提到的標志(RT_IPC_FLAG_FIFO、RT_IPC_FLAG_PRIO);
  • /**
     * This function will initialize a semaphore and put it under control of
     * resource management.
     *
     * @param sem the semaphore object
     * @param name the name of semaphore
     * @param value the init value of semaphore
     * @param flag the flag of semaphore
     *
     * @return the operation status, RT_EOK on successful
     */
    rt_err_t rt_sem_init(rt_sem_t    sem,
                         const char *name,
                         rt_uint32_t value,
                         rt_uint8_t  flag)

     

  • 脫離信號量就是讓信號量對象從內核對象管理器中脫離,適用於靜態初始化的信號量。

  • /**
     * This function will detach a semaphore from resource management
     *
     * @param sem the semaphore object
     *
     * @return the operation status, RT_EOK on successful
     *
     * @see rt_sem_delete
     */
    rt_err_t rt_sem_detach(rt_sem_t sem)

獲取信號量

  • 線程通過獲取信號量來獲得信號量資源實例,當信號量值大於零時,線程將獲得信號量,並且相應的信號量值會減 1;
  • /**
     * This function will take a semaphore, if the semaphore is unavailable, the
     * thread shall wait for a specified time.
     *
     * @param sem the semaphore object
     * @param time the waiting time
     *
     * @return the error code : RT_EOK  -RT_ETIMEOUT  -RT_ERROR
     */
    rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time)

     

  •  在調用這個函數時,如果信號量的值等於零,那么說明當前信號量資源實例不可用,申請該信號量的線程將根據 time 參數的情況選擇直接返回、或掛起等待一段時間、或永久等待,直到其他線程或中斷釋放該信號量。如果在參數 time 指定的時間內依然得不到信號量,線程將超時返回,返回值是 - RT_ETIMEOUT;

無等待獲取信號量

  • 當用戶不想在申請的信號量上掛起線程進行等待時,可以使用無等待方式獲取信號量;
  • /**
     * This function will try to take a semaphore and immediately return
     *
     * @param sem the semaphore object
     *
     * @return the error code : - RT_ETIMEOUT RT_EOK
     */
    rt_err_t rt_sem_trytake(rt_sem_t sem)

     

  • 這個函數與 rt_sem_take(sem, 0) 的作用相同,即當線程申請的信號量資源實例不可用的時候,它不會等待在該信號量上,而是直接返回 - RT_ETIMEOUT;

釋放信號量

  • 釋放信號量可以喚醒掛起在該信號量上的線程;
  • /**
     * This function will release a semaphore, if there are threads suspended on
     * semaphore, it will be waked up.
     *
     * @param sem the semaphore object
     *
     * @return the error code
     */
    rt_err_t rt_sem_release(rt_sem_t sem)

     

  • 例如當信號量的值等於零時,並且有線程等待這個信號量時,釋放信號量將喚醒等待在該信號量線程隊列中的第一個線程,由它獲取信號量;否則將把信號量的值加 1。

信號量應用示例

  • 使用 2 個線程、3 個信號量實現生產者與消費者的例子。其中:
  • 3 個信號量分別為:①lock:信號量鎖的作用,因為 2 個線程都會對同一個數組 array 進行操作,所以該數組是一個共享資源,鎖用來保護這個共享資源。②empty:空位個數,初始化為 5 個空位。③full:滿位個數,初始化為 0 個滿位。

  • 2 個線程分別為:①生產者線程:獲取到空位后,產生一個數字,循環放入數組中,然后釋放一個滿位。②消費者線程:獲取到滿位后,讀取數組內容並相加,然后釋放一個空位。
  • #include <rtthread.h>
    
    #define THREAD_PRIORITY       6
    #define THREAD_STACK_SIZE     512
    #define THREAD_TIMESLICE      5
    
    /* 定義最大 5 個元素能夠被產生 */
    #define MAXSEM 5
    
    /* 用於放置生產的整數數組 */
    rt_uint32_t array[MAXSEM];
    
    /* 指向生產者、消費者在 array 數組中的讀寫位置 */
    static rt_uint32_t set, get;
    
    /* 指向線程控制塊的指針 */
    static rt_thread_t producer_tid = RT_NULL;
    static rt_thread_t consumer_tid = RT_NULL;
    
    struct rt_semaphore sem_lock;
    struct rt_semaphore sem_empty, sem_full;
    
    /* 生產者線程入口 */
    void producer_thread_entry(void *parameter)
    {
        int cnt = 0;
    
        /* 運行 10 次 */
        while (cnt < 10)
        {
            /* 獲取一個空位 */
            rt_sem_take(&sem_empty, RT_WAITING_FOREVER);
    
            /* 修改 array 內容,上鎖 */
            rt_sem_take(&sem_lock, RT_WAITING_FOREVER);
            array[set % MAXSEM] = cnt + 1;
            rt_kprintf("the producer generates a number: %d\n", array[set % MAXSEM]);
            set++;
            rt_sem_release(&sem_lock);
    
            /* 發布一個滿位 */
            rt_sem_release(&sem_full);
            cnt++;
    
            /* 暫停一段時間 */
            rt_thread_mdelay(20);
        }
    
        rt_kprintf("the producer exit!\n");
    }
    
    /* 消費者線程入口 */
    void consumer_thread_entry(void *parameter)
    {
        rt_uint32_t sum = 0;
    
        while (1)
        {
            /* 獲取一個滿位 */
            rt_sem_take(&sem_full, RT_WAITING_FOREVER);
    
            /* 臨界區,上鎖進行操作 */
            rt_sem_take(&sem_lock, RT_WAITING_FOREVER);
            sum += array[get % MAXSEM];
            rt_kprintf("the consumer[%d] get a number: %d\n", (get % MAXSEM), array[get % MAXSEM]);
            get++;
            rt_sem_release(&sem_lock);
    
            /* 釋放一個空位 */
            rt_sem_release(&sem_empty);
    
            /* 生產者生產到 10 個數目,停止,消費者線程相應停止 */
            if (get == 10) break;
    
            /* 暫停一小會時間 */
            rt_thread_mdelay(50);
        }
    
        rt_kprintf("the consumer sum is: %d\n", sum);
        rt_kprintf("the consumer exit!\n");
    }
    
    int producer_consumer(void)
    {
        set = 0;
        get = 0;
    
        /* 初始化 3 個信號量 */
        rt_sem_init(&sem_lock, "lock",     1,      RT_IPC_FLAG_FIFO);
        rt_sem_init(&sem_empty, "empty",   MAXSEM, RT_IPC_FLAG_FIFO);
        rt_sem_init(&sem_full, "full",     0,      RT_IPC_FLAG_FIFO);
    
        /* 創建生產者線程 */
        producer_tid = rt_thread_create("producer",
                                        producer_thread_entry, RT_NULL,
                                        THREAD_STACK_SIZE,
                                        THREAD_PRIORITY - 1,
                                        THREAD_TIMESLICE);
        if (producer_tid != RT_NULL)
        {
            rt_thread_startup(producer_tid);
        }
        else
        {
            rt_kprintf("create thread producer failed");
            return -1;
        }
    
        /* 創建消費者線程 */
        consumer_tid = rt_thread_create("consumer",
                                        consumer_thread_entry, RT_NULL,
                                        THREAD_STACK_SIZE,
                                        THREAD_PRIORITY + 1,
                                        THREAD_TIMESLICE);
        if (consumer_tid != RT_NULL)
        {
            rt_thread_startup(consumer_tid);
        }
        else
        {
            rt_kprintf("create thread consumer failed");
            return -1;
        }
    
        return 0;
    }
    
    /* 導出到 msh 命令列表中 */
    MSH_CMD_EXPORT(producer_consumer, producer_consumer sample);

    運行結果

  • \ | /
    - RT -     Thread Operating System
     / | \     3.1.0 build Aug 27 2018
     2006 - 2018 Copyright by rt-thread team
    msh >producer_consumer
    the producer generates a number: 1
    the consumer[0] get a number: 1
    msh >the producer generates a number: 2
    the producer generates a number: 3
    the consumer[1] get a number: 2
    the producer generates a number: 4
    the producer generates a number: 5
    the producer generates a number: 6
    the consumer[2] get a number: 3
    the producer generates a number: 7
    the producer generates a number: 8
    the consumer[3] get a number: 4
    the producer generates a number: 9
    the consumer[4] get a number: 5
    the producer generates a number: 10
    the producer exit!
    the consumer[0] get a number: 6
    the consumer[1] get a number: 7
    the consumer[2] get a number: 8
    the consumer[3] get a number: 9
    the consumer[4] get a number: 10
    the consumer sum is: 55
    the consumer exit!

     

  • 程序分析

  • 生產者線程:
  1. 獲取 1 個空位(放產品 number),此時空位減 1;
  2. 上鎖保護;本次的產生的 number 值為 cnt+1,把值循環存入數組 array 中;再開鎖;
  3. 釋放 1 個滿位(給倉庫中放置一個產品,倉庫就多一個滿位),滿位加 1;
  • 消費者線程:
  1. 獲取 1 個滿位(取產品 number),此時滿位減 1;
  2. 上鎖保護;將本次生產者生產的 number 值從 array 中讀出來,並與上次的 number 值相加;再開鎖;
  3. 釋放 1 個空位(從倉庫上取走一個產品,倉庫就多一個空位),空位加 1。
  • 生產者依次產生 10 個 number,消費者依次取走,並將 10 個 number 的值求和。信號量鎖 lock 保護 array 臨界區資源:保證了消費者每次取 number 值的排他性,實現了線程間同步。

信號量使用場合

  • 信號量是一種非常靈活的同步方式,可以運用在多種場合中。形成鎖、同步、資源計數等關系,也能方便的用於線程與線程、中斷與線程間的同步中。

線程同步

  • 線程同步是信號量最簡單的一類應用。例如,使用信號量進行兩個線程之間的同步,信號量的值初始化成 0,表示具備 0 個信號量資源實例;而嘗試獲得該信號量的線程,將直接在這個信號量上進行等待。
  • 當持有信號量的線程完成它處理的工作時,釋放這個信號量,可以把等待在這個信號量上的線程喚醒,讓它執行下一部分工作。這類場合也可以看成把信號量用於工作完成標志:持有信號量的線程完成它自己的工作,然后通知等待該信號量的線程繼續下一部分工作。

鎖(二值信號量)

  • 鎖,單一的鎖常應用於多個線程間對同一共享資源(即臨界區)的訪問;
  • struct  rt_semaphore sem_lock;
    rt_sem_take(&sem_lock, RT_WAITING_FOREVER); //上鎖 rt_sem_release(&sem_lock); //解鎖
  • 信號量在作為鎖來使用時,通常應將信號量資源實例初始化成 1,代表系統默認有一個資源可用,因為信號量的值始終在 1 和 0 之間變動,所以這類鎖也叫做二值信號量;

中斷與線程的同步

  • 信號量也能夠方便地應用於中斷與線程間的同步,例如一個中斷觸發,中斷服務例程需要通知線程進行相應的數據處理。這個時候可以設置信號量的初始值是 0,線程在試圖持有這個信號量時,由於信號量的初始值是 0,線程直接在這個信號量上掛起直到信號量被釋放。當中斷觸發時,先進行與硬件相關的動作,例如從硬件的 I/O 口中讀取相應的數據,並確認中斷以清除中斷源,而后釋放一個信號量來喚醒相應的線程以做后續的數據處理。例如 FinSH 線程的處理方式

 

  • 信號量的值初始為 0,當 FinSH 線程試圖取得信號量時,因為信號量值是 0,所以它會被掛起。當 console 設備有數據輸入時,產生中斷,從而進入中斷服務例程。在中斷服務例程中,它會讀取 console 設備的數據,並把讀得的數據放入 UART buffer 中進行緩沖,而后釋放信號量,釋放信號量的操作將喚醒 shell 線程。在中斷服務例程運行完畢后,如果系統中沒有比 shell 線程優先級更高的就緒線程存在時,shell 線程將持有信號量並運行,從 UART buffer 緩沖區中獲取輸入的數據。
  • 注意:中斷與線程間的互斥不能采用信號量(鎖)的方式,而應采用開關中斷的方式。

 資源計數

  • 號量也可以認為是一個遞增或遞減的計數器,需要注意的是信號量的值非負。例如:初始化一個信號量的值為 5,則這個信號量可最大連續減少 5 次,直到計數器減為 0。資源計數適合於線程間工作處理速度不匹配的場合,這個時候信號量可以做為前一線程工作完成個數的計數,而當調度到后一線程時,它也可以以一種連續的方式一次處理多個事件。
  • 注意:一般資源計數類型多是混合方式的線程間同步,因為對於單個的資源處理依然存在線程的多重訪問,這就需要對一個單獨的資源進行訪問、處理,並進行鎖方式的互斥操作。

 

互斥量(mutex)

互斥量工作機制

  • 互斥量和信號量不同的是:擁有互斥量的線程擁有互斥量的所有權,互斥量支持遞歸訪問且能防止線程優先級翻轉;並且互斥量只能由持有線程釋放,而信號量則可以由任何線程釋放。.
  • 使用信號量會導致的另一個潛在問題是線程優先級翻轉問題。所謂優先級翻轉,即當一個高優先級線程試圖通過信號量機制訪問共享資源時,如果該信號量已被一低優先級線程持有,而這個低優先級線程在運行過程中可能又被其它一些中等優先級的線程搶占,因此造成高優先級線程被許多具有較低優先級的線程阻塞,實時性難以得到保證,如下圖:

  • 有優先級為 A、B 和 C 的三個線程,優先級 A> B > C。線程 A,B 處於掛起狀態,等待某一事件觸發,線程 C 正在運行,此時線程 C 開始使用某一共享資源 M。在使用過程中,線程 A 等待的事件到來,線程 A 轉為就緒態,因為它比線程 C 優先級高,所以立即執行。但是當線程 A 要使用共享資源 M 時,由於其正在被線程 C 使用,因此線程 A 被掛起切換到線程 C 運行。如果此時線程 B 等待的事件到來,則線程 B 轉為就緒態。由於線程 B 的優先級比線程 C 高,因此線程 B 開始運行,直到其運行完畢,線程 C 才開始運行。只有當線程 C 釋放共享資源 M 后,線程 A 才得以執行。在這種情況下,優先級發生了翻轉:線程 B 先於線程 A 運行。這樣便不能保證高優先級線程的響應時間。

 

  •  在 RT-Thread 操作系統中,互斥量可以解決優先級翻轉問題,實現的是優先級繼承算法;
  • 優先級繼承是通過在線程 A 嘗試獲取共享資源而被掛起的期間內,將線程 C 的優先級提升到線程 A 的優先級別,從而解決優先級翻轉引起的問題。這樣能夠防止 C(間接地防止 A)被 B 搶占,如上圖所示。優先級繼承是指,提高某個占有某種資源的低優先級線程的優先級,使之與所有等待該資源的線程中優先級最高的那個線程的優先級相等,然后執行,而當這個低優先級線程釋放該資源時,優先級重新回到初始設定。因此,繼承優先級的線程避免了系統資源被任何中間優先級的線程搶占。

 

  • 注意:在獲得互斥量后,請盡快釋放互斥量,並且在持有互斥量的過程中,不得再行更改持有互斥量線程的優先級。

互斥量控制塊

  • 在 RT-Thread 中,互斥量控制塊是操作系統用於管理互斥量的一個數據結構,由結構體 struct rt_mutex 表示,rt_mutex 對象從 rt_ipc_object 中派生,由 IPC 容器所管理。
  • /**
     * Mutual exclusion (mutex) structure
     */
    struct rt_mutex
    {
        struct rt_ipc_object parent;                        /**< inherit from ipc_object */
    
        rt_uint16_t          value;                         /**< value of mutex */
    
        rt_uint8_t           original_priority;             /**< priority of last thread hold the mutex */
        rt_uint8_t           hold;                          /**< numbers of thread hold the mutex */
    
        struct rt_thread    *owner;                         /**< current owner of mutex */
    };
    typedef struct rt_mutex *rt_mutex_t;
    #endif

互斥量管理方式

  • 互斥量控制塊中含有互斥相關的重要參數,在互斥量功能的實現中起到重要的作用。對一個互斥量的操作包含:創建 / 初始化互斥量、獲取互斥量、釋放互斥量、刪除 / 脫離互斥量。

創建和刪除互斥量

  •  創建一個互斥量時,內核首先創建一個互斥量控制塊,然后完成對該控制塊的初始化工作。
  • /**
     * This function will create a mutex from system resource
     *
     * @param name the name of mutex
     * @param flag the flag of mutex
     *
     * @return the created mutex, RT_NULL on error happen
     *
     * @see rt_mutex_init
     */
    rt_mutex_t rt_mutex_create(const char *name, rt_uint8_t flag)
  • 可以調用 rt_mutex_create 函數創建一個互斥量,它的名字由 name 所指定。當調用這個函數時,系統將先從對象管理器中分配一個 mutex 對象,並初始化這個對象,然后初始化父類 IPC 對象以及與 mutex 相關的部分。

  • 互斥量的 flag 標志設置為 RT_IPC_FLAG_PRIO,表示在多個線程等待資源時,將由優先級高的線程優先獲得資源。

  • flag 設置為 RT_IPC_FLAG_FIFO,表示在多個線程等待資源時,將按照先來先得的順序獲得資源。下表描述了該函數的輸入參數與返回值:

  •  當刪除一個互斥量時,所有等待此互斥量的線程都將被喚醒,等待線程獲得的返回值是 - RT_ERROR。然后系統將該互斥量從內核對象管理器鏈表中刪除並釋放互斥量占用的內存空間。
  • /**
     * This function will delete a mutex object and release the memory
     *
     * @param mutex the mutex object
     *
    * @return the error code : RT_EOK
     *
     * @see rt_mutex_detach
     */
    rt_err_t rt_mutex_delete(rt_mutex_t mutex)

初始化和脫離互斥量

  •  靜態互斥量對象的內存是在系統編譯時由編譯器分配的,一般放於讀寫數據段或未初始化數據段中。在使用這類靜態互斥量對象前,需要先進行初始化。
  • /**
     * This function will initialize a mutex and put it under control of resource
     * management.
     *
     * @param mutex the mutex object
     * @param name the name of mutex
     * @param flag the flag of mutex
     *
     * @return the operation status, RT_EOK on successful
     */
    rt_err_t rt_mutex_init(rt_mutex_t mutex, const char *name, rt_uint8_t flag)
  • 脫離互斥量將把互斥量對象從內核對象管理器中脫離,適用於靜態初始化的互斥量。

  • /**
     * This function will detach a mutex from resource management
     *
     * @param mutex the mutex object
     *
     * @return the operation status, RT_EOK on successful
     *
     * @see rt_mutex_delete
     */
    rt_err_t rt_mutex_detach(rt_mutex_t mutex)

獲取互斥量

  •  線程獲取了互斥量,那么線程就有了對該互斥量的所有權,即某一個時刻一個互斥量只能被一個線程持有。
  • /**
     * This function will take a mutex, if the mutex is unavailable, the
     * thread shall wait for a specified time.
     *
     * @param mutex the mutex object
     * @param time the waiting time
     *
     * @return the error code
     */
    rt_err_t rt_mutex_take(rt_mutex_t mutex, rt_int32_t time)

釋放互斥量

  •  當線程完成互斥資源的訪問后,應盡快釋放它占據的互斥量,使得其他線程能及時獲取該互斥量。
  • /**
     * This function will release a mutex, if there are threads suspended on mutex,
     * it will be waked up.
     *
     * @param mutex the mutex object
     *
     * @return the error code
     */
    rt_err_t rt_mutex_release(rt_mutex_t mutex)
  • 使用該函數接口時,只有已經擁有互斥量控制權的線程才能釋放它,每釋放一次該互斥量,它的持有計數就減 1。當該互斥量的持有計數為零時(即持有線程已經釋放所有的持有操作),它變為可用,等待在該信號量上的線程將被喚醒。如果線程的運行優先級被互斥量提升,那么當互斥量被釋放后,線程恢復為持有互斥量前的優先級。

互斥量應用示例

  • 這是一個互斥量的應用例程,互斥鎖是一種保護共享資源的方法。當一個線程擁有互斥鎖的時候,可以保護共享資源不被其他線程破壞。
  • 有兩個線程:線程 1 和線程 2,線程 1 對 2 個 number 分別進行加 1 操作;線程 2 也對 2 個 number 分別進行加 1 操作,使用互斥量保證線程改變 2 個 number 值的操作不被打斷。如下代碼所示:
  • #include <rtthread.h>
    
    #define THREAD_PRIORITY         8
    #define THREAD_TIMESLICE        5
    
    /* 指向互斥量的指針 */
    static rt_mutex_t dynamic_mutex = RT_NULL;
    static rt_uint8_t number1,number2 = 0;
    
    ALIGN(RT_ALIGN_SIZE)
    static char thread1_stack[1024];
    static struct rt_thread thread1;
    static void rt_thread_entry1(void *parameter)
    {
          while(1)
          {
              /* 線程 1 獲取到互斥量后,先后對 number1、number2 進行加 1 操作,然后釋放互斥量 */
              rt_mutex_take(dynamic_mutex, RT_WAITING_FOREVER);
              number1++;
              rt_thread_mdelay(10);
              number2++;
              rt_mutex_release(dynamic_mutex);
           }
    }
    
    ALIGN(RT_ALIGN_SIZE)
    static char thread2_stack[1024];
    static struct rt_thread thread2;
    static void rt_thread_entry2(void *parameter)
    {
          while(1)
          {
              /* 線程 2 獲取到互斥量后,檢查 number1、number2 的值是否相同,相同則表示 mutex 起到了鎖的作用 */
              rt_mutex_take(dynamic_mutex, RT_WAITING_FOREVER);
              if(number1 != number2)
              {
                rt_kprintf("not protect.number1 = %d, mumber2 = %d \n",number1 ,number2);
              }
              else
              {
                rt_kprintf("mutex protect ,number1 = mumber2 is %d\n",number1);
              }
    
               number1++;
               number2++;
               rt_mutex_release(dynamic_mutex);
    
              if(number1>=50)
                  return;
          }
    }
    
    /* 互斥量示例的初始化 */
    int mutex_sample(void)
    {
        /* 創建一個動態互斥量 */
        dynamic_mutex = rt_mutex_create("dmutex", RT_IPC_FLAG_FIFO);
        if (dynamic_mutex == RT_NULL)
        {
            rt_kprintf("create dynamic mutex failed.\n");
            return -1;
        }
    
        rt_thread_init(&thread1,
                       "thread1",
                       rt_thread_entry1,
                       RT_NULL,
                       &thread1_stack[0],
                       sizeof(thread1_stack),
                       THREAD_PRIORITY, THREAD_TIMESLICE);
        rt_thread_startup(&thread1);
    
        rt_thread_init(&thread2,
                       "thread2",
                       rt_thread_entry2,
                       RT_NULL,
                       &thread2_stack[0],
                       sizeof(thread2_stack),
                       THREAD_PRIORITY-1, THREAD_TIMESLICE);
        rt_thread_startup(&thread2);
        return 0;
    }
    
    /* 導出到 MSH 命令列表中 */
    MSH_CMD_EXPORT(mutex_sample, mutex sample);
  • 線程 1 與線程 2 中均使用互斥量保護對 2 個 number 的操作(倘若將線程 1 中的獲取、釋放互斥量語句注釋掉,線程 1 將對 number 不再做保護),運行結果如下:

  • \ | /
    - RT -     Thread Operating System
     / | \     3.1.0 build Aug 24 2018
     2006 - 2018 Copyright by rt-thread team
    msh >mutex_sample
    msh >mutex protect ,number1 = mumber2 is 1
    mutex protect ,number1 = mumber2 is 2
    mutex protect ,number1 = mumber2 is 3
    mutex protect ,number1 = mumber2 is 4
    …
    mutex protect ,number1 = mumber2 is 48
    mutex protect ,number1 = mumber2 is 49

     

  • 防止優先級翻轉特性例程:創建 3 個動態線程以檢查持有互斥量時,持有的線程優先級是否被調整到等待線程優先級中的最高優先級。
  • #include <rtthread.h>
    
    /* 指向線程控制塊的指針 */
    static rt_thread_t tid1 = RT_NULL;
    static rt_thread_t tid2 = RT_NULL;
    static rt_thread_t tid3 = RT_NULL;
    static rt_mutex_t mutex = RT_NULL;
    
    
    #define THREAD_PRIORITY       10
    #define THREAD_STACK_SIZE     512
    #define THREAD_TIMESLICE    5
    
    /* 線程 1 入口 */
    static void thread1_entry(void *parameter)
    {
        /* 先讓低優先級線程運行 */
        rt_thread_mdelay(100);
    
        /* 此時 thread3 持有 mutex,並且 thread2 等待持有 mutex */
    
        /* 檢查 thread2 與 thread3 的優先級情況 */
        if (tid2->current_priority != tid3->current_priority)
        {
            /* 優先級不相同,測試失敗 */
            rt_kprintf("the priority of thread2 is: %d\n", tid2->current_priority);
            rt_kprintf("the priority of thread3 is: %d\n", tid3->current_priority);
            rt_kprintf("test failed.\n");
            return;
        }
        else
        {
            rt_kprintf("the priority of thread2 is: %d\n", tid2->current_priority);
            rt_kprintf("the priority of thread3 is: %d\n", tid3->current_priority);
            rt_kprintf("test OK.\n");
        }
    }
    
    /* 線程 2 入口 */
    static void thread2_entry(void *parameter)
    {
        rt_err_t result;
    
        rt_kprintf("the priority of thread2 is: %d\n", tid2->current_priority);
    
        /* 先讓低優先級線程運行 */
        rt_thread_mdelay(50);
    
        /*
         * 試圖持有互斥鎖,此時 thread3 持有,應把 thread3 的優先級提升
         * 到 thread2 相同的優先級
         */
        result = rt_mutex_take(mutex, RT_WAITING_FOREVER);
    
        if (result == RT_EOK)
        {
            /* 釋放互斥鎖 */
            rt_mutex_release(mutex);
        }
    }
    
    /* 線程 3 入口 */
    static void thread3_entry(void *parameter)
    {
        rt_tick_t tick;
        rt_err_t result;
    
        rt_kprintf("the priority of thread3 is: %d\n", tid3->current_priority);
    
        result = rt_mutex_take(mutex, RT_WAITING_FOREVER);
        if (result != RT_EOK)
        {
            rt_kprintf("thread3 take a mutex, failed.\n");
        }
    
        /* 做一個長時間的循環,500ms */
        tick = rt_tick_get();
        while (rt_tick_get() - tick < (RT_TICK_PER_SECOND / 2)) ;
    
        rt_mutex_release(mutex);
    }
    
    int pri_inversion(void)
    {
        /* 創建互斥鎖 */
        mutex = rt_mutex_create("mutex", RT_IPC_FLAG_FIFO);
        if (mutex == RT_NULL)
        {
            rt_kprintf("create dynamic mutex failed.\n");
            return -1;
        }
    
        /* 創建線程 1 */
        tid1 = rt_thread_create("thread1",
                                thread1_entry,
                                RT_NULL,
                                THREAD_STACK_SIZE,
                                THREAD_PRIORITY - 1, THREAD_TIMESLICE);
        if (tid1 != RT_NULL)
             rt_thread_startup(tid1);
    
        /* 創建線程 2 */
        tid2 = rt_thread_create("thread2",
                                thread2_entry,
                                RT_NULL,
                                THREAD_STACK_SIZE,
                                THREAD_PRIORITY, THREAD_TIMESLICE);
        if (tid2 != RT_NULL)
            rt_thread_startup(tid2);
    
        /* 創建線程 3 */
        tid3 = rt_thread_create("thread3",
                                thread3_entry,
                                RT_NULL,
                                THREAD_STACK_SIZE,
                                THREAD_PRIORITY + 1, THREAD_TIMESLICE);
        if (tid3 != RT_NULL)
            rt_thread_startup(tid3);
    
        return 0;
    }
    
    /* 導出到 msh 命令列表中 */
    MSH_CMD_EXPORT(pri_inversion, prio_inversion sample);
  • 運行結果

  • \ | /
    - RT -     Thread Operating System
     / | \     3.1.0 build Aug 27 2018
     2006 - 2018 Copyright by rt-thread team
    msh >pri_inversion
    the priority of thread2 is: 10
    the priority of thread3 is: 11
    the priority of thread2 is: 10
    the priority of thread3 is: 10
    test OK.

    注意:互斥量不能在中斷服務例程中使用

互斥量使用場合

  • 互斥量是信號量的一種,並且它是以鎖的形式存在。在初始化的時候,互斥量永遠都處於開鎖的狀態,而被線程持有的時候則立刻轉為閉鎖的狀態。互斥量更適合於:
  1. 線程多次持有互斥量的情況下。這樣可以避免同一線程多次遞歸持有而造成死鎖的問題。
  2. 可能會由於多線程同步而造成優先級翻轉的情況。

事件集(event)

  • 事件集也是線程間同步的機制之一,一個事件集可以包含多個事件,利用事件集可以完成一對多,多對多的線程間同步。下面以坐公交為例說明事件,在公交站等公交時可能有以下幾種情況:
  • ①P1 坐公交去某地,只有一種公交可以到達目的地,等到此公交即可出發。
  • ②P1 坐公交去某地,有 3 種公交都可以到達目的地,等到其中任意一輛即可出發。
  • ③P1 約另一人 P2 一起去某地,則 P1 必須要等到 “同伴 P2 到達公交站” 與“公交到達公交站”兩個條件都滿足后,才能出發。
  • 這里,可以將 P1 去某地視為線程,將 “公交到達公交站”、“同伴 P2 到達公交站” 視為事件的發生,情況①是特定事件喚醒線程;情況②是任意單個事件喚醒線程;情況③是多個事件同時發生才喚醒線程。

事件集工作機制

  • 事件集主要用於線程間的同步,與信號量不同,它的特點是可以實現一對多,多對多的同步。即一個線程與多個事件的關系可設置為:其中任意一個事件喚醒線程,或幾個事件都到達后才喚醒線程進行后續的處理;同樣,事件也可以是多個線程同步多個事件。
  • 這種多個事件的集合可以用一個 32 位無符號整型變量來表示,變量的每一位代表一個事件,線程通過 “邏輯與” 或“邏輯或”將一個或多個事件關聯起來,形成事件組合。事件的 “邏輯或” 也稱為是獨立型同步,指的是線程與任何事件之一發生同步;事件 “邏輯與” 也稱為是關聯型同步,指的是線程與若干事件都發生同步。
  • T-Thread 定義的事件集有以下特點:
  • 事件只與線程相關,事件間相互獨立:每個線程可擁有 32 個事件標志,采用一個 32 bit 無符號整型數進行記錄,每一個 bit 代表一個事件;
  • 事件僅用於同步,不提供數據傳輸功能;
  • 事件無排隊性,即多次向線程發送同一事件 (如果線程還未來得及讀走),其效果等同於只發送一次。
  • 每個線程都擁有一個事件信息標記,它有三個屬性,分別是 RT_EVENT_FLAG_AND(邏輯與),RT_EVENT_FLAG_OR(邏輯或)以及 RT_EVENT_FLAG_CLEAR(清除標記)。當線程等待事件同步時,可以通過 32 個事件標志和這個事件信息標記來判斷當前接收的事件是否滿足同步條件。

  • 如上圖所示,線程 #1 的事件標志中第 1 位和第 30 位被置位,如果事件信息標記位設為邏輯與,則表示線程 #1 只有在事件 1 和事件 30 都發生以后才會被觸發喚醒,如果事件信息標記位設為邏輯或,則事件 1 或事件 30 中的任意一個發生都會觸發喚醒線程 #1。如果信息標記同時設置了清除標記位,則當線程 #1 喚醒后將主動把事件 1 和事件 30 清為零,否則事件標志將依然存在(即置 1)。

事件集控制塊

  • 在 RT-Thread 中,事件集控制塊是操作系統用於管理事件的一個數據結構,由結構體 struct rt_event 表示;
  • /*
     * event structure
     */
    struct rt_event
    {
        struct rt_ipc_object parent;                        /**< inherit from ipc_object */
    
        rt_uint32_t          set;                           /**< event set */
    };
    typedef struct rt_event *rt_event_t;

事件集管理方式

  • 事件集控制塊中含有與事件集相關的重要參數,在事件集功能的實現中起重要的作用,對一個事件集的操作包含:創建 / 初始化事件集、發送事件、接收事件、刪除 / 脫離事件集。

創建和刪除事件集

  • 當創建一個事件集時,內核首先創建一個事件集控制塊,然后對該事件集控制塊進行基本的初始化;
  • 調用該函數接口時,系統會從對象管理器中分配事件集對象,並初始化這個對象,然后初始化父類 IPC 對象。
  • /**
     * This function will create an event object from system resource
     *
     * @param name the name of event
     * @param flag the flag of event
     *
     * @return the created event, RT_NULL on error happen
     */
    rt_event_t rt_event_create(const char *name, rt_uint8_t flag)
  • 系統不再使用 rt_event_create() 創建的事件集對象時,通過刪除事件集對象控制塊來釋放系統資源;

  • 在調用 rt_event_delete 函數刪除一個事件集對象時,應該確保該事件集不再被使用。在刪除前會喚醒所有掛起在該事件集上的線程(線程的返回值是 - RT_ERROR),然后釋放事件集對象占用的內存塊。
  • /**
     * This function will delete an event object and release the memory
     *
     * @param event the event object
     *
     * @return the error code
     */
    rt_err_t rt_event_delete(rt_event_t event)

    初始化和脫離事件集

  • 靜態事件集對象的內存是在系統編譯時由編譯器分配的,一般放於讀寫數據段或未初始化數據段中。在使用靜態事件集對象前,需要先行對它進行初始化操作;
  • 調用該接口時,需指定靜態事件集對象的句柄(即指向事件集控制塊的指針),然后系統會初始化事件集對象,並加入到系統對象容器中進行管理。
  • /**
     * This function will initialize an event and put it under control of resource
     * management.
     *
     * @param event the event object
     * @param name the name of event
     * @param flag the flag of event
     *
     * @return the operation status, RT_EOK on successful
     */
    rt_err_t rt_event_init(rt_event_t event, const char *name, rt_uint8_t flag)

     

  • 系統不再使用 rt_event_init() 初始化的事件集對象時,通過脫離事件集對象控制塊來釋放系統資源。脫離事件集是將事件集對象從內核對象管理器中脫離;

  • 用戶調用這個函數時,系統首先喚醒所有掛在該事件集等待隊列上的線程(線程的返回值是 - RT_ERROR),然后將該事件集從內核對象管理器中脫離。
  •  * This function will detach an event object from resource management
     *
     * @param event the event object
     *
     * @return the operation status, RT_EOK on successful
     */
    rt_err_t rt_event_detach(rt_event_t event)

    發送事件

  •  發送事件函數可以發送事件集中的一個或多個事件;
  • 使用該函數接口時,通過參數 set 指定的事件標志來設定 event 事件集對象的事件標志值,然后遍歷等待在 event 事件集對象上的等待線程鏈表,判斷是否有線程的事件激活要求與當前 event 對象事件標志值匹配,如果有,則喚醒該線程;
  • /**
     * This function will send an event to the event object, if there are threads
     * suspended on event object, it will be waked up.
     *
     * @param event the event object
     * @param set the event set
     *
     * @return the error code
     */
    rt_err_t rt_event_send(rt_event_t event, rt_uint32_t set)

    接收事件

  • 內核使用 32 位的無符號整數來標識事件集,它的每一位代表一個事件,因此一個事件集對象可同時等待接收 32 個事件,內核可以通過指定選擇參數 “邏輯與” 或“邏輯或”來選擇如何激活線程,使用 “邏輯與” 參數表示只有當所有等待的事件都發生時才激活線程,而使用 “邏輯或” 參數則表示只要有一個等待的事件發生就激活線程。
  • /**
     * This function will receive an event from event object, if the event is
     * unavailable, the thread shall wait for a specified time.
     *
     * @param event the fast event object
     * @param set the interested event set
     * @param option the receive option, either RT_EVENT_FLAG_AND or
     *        RT_EVENT_FLAG_OR should be set.
     * @param timeout the waiting time
     * @param recved the received event, if you don't care, RT_NULL can be set.
     *
     * @return the error code
     */
    rt_err_t rt_event_recv(rt_event_t   event,
                           rt_uint32_t  set,
                           rt_uint8_t   option,
                           rt_int32_t   timeout,
                           rt_uint32_t *recved)

     

  • 當用戶調用這個接口時,系統首先根據 set 參數和接收選項 option 來判斷它要接收的事件是否發生,如果已經發生,則根據參數 option 上是否設置有 RT_EVENT_FLAG_CLEAR 來決定是否重置事件的相應標志位,然后返回(其中 recved 參數返回接收到的事件);如果沒有發生,則把等待的 set 和 option 參數填入線程本身的結構中,然后把線程掛起在此事件上,直到其等待的事件滿足條件或等待時間超過指定的超時時間。如果超時時間設置為零,則表示當線程要接受的事件沒有滿足其要求時就不等待,而直接返回 - RT_ETIMEOUT;

事件集應用示例

  • 初始化了一個事件集,兩個線程。一個線程等待自己關心的事件發生,另外一個線程發送事件;
  • #include <rtthread.h>
    
    #define THREAD_PRIORITY      9
    #define THREAD_TIMESLICE     5
    
    #define EVENT_FLAG3 (1 << 3)
    #define EVENT_FLAG5 (1 << 5)
    
    /* 事件控制塊 */
    static struct rt_event event;
    
    ALIGN(RT_ALIGN_SIZE)
    static char thread1_stack[1024];
    static struct rt_thread thread1;
    
    /* 線程 1 入口函數 */
    static void thread1_recv_event(void *param)
    {
        rt_uint32_t e;
    
        /* 第一次接收事件,事件 3 或事件 5 任意一個可以觸發線程 1,接收完后清除事件標志 */
        if (rt_event_recv(&event, (EVENT_FLAG3 | EVENT_FLAG5),
                          RT_EVENT_FLAG_OR | RT_EVENT_FLAG_CLEAR,
                          RT_WAITING_FOREVER, &e) == RT_EOK)
        {
            rt_kprintf("thread1: OR recv event 0x%x\n", e);
        }
    
        rt_kprintf("thread1: delay 1s to prepare the second event\n");
        rt_thread_mdelay(1000);
    
        /* 第二次接收事件,事件 3 和事件 5 均發生時才可以觸發線程 1,接收完后清除事件標志 */
        if (rt_event_recv(&event, (EVENT_FLAG3 | EVENT_FLAG5),
                          RT_EVENT_FLAG_AND | RT_EVENT_FLAG_CLEAR,
                          RT_WAITING_FOREVER, &e) == RT_EOK)
        {
            rt_kprintf("thread1: AND recv event 0x%x\n", e);
        }
        rt_kprintf("thread1 leave.\n");
    }
    
    
    ALIGN(RT_ALIGN_SIZE)
    static char thread2_stack[1024];
    static struct rt_thread thread2;
    
    /* 線程 2 入口 */
    static void thread2_send_event(void *param)
    {
        rt_kprintf("thread2: send event3\n");
        rt_event_send(&event, EVENT_FLAG3);
        rt_thread_mdelay(200);
    
        rt_kprintf("thread2: send event5\n");
        rt_event_send(&event, EVENT_FLAG5);
        rt_thread_mdelay(200);
    
        rt_kprintf("thread2: send event3\n");
        rt_event_send(&event, EVENT_FLAG3);
        rt_kprintf("thread2 leave.\n");
    }
    
    int event_sample(void)
    {
        rt_err_t result;
    
        /* 初始化事件對象 */
        result = rt_event_init(&event, "event", RT_IPC_FLAG_FIFO);
        if (result != RT_EOK)
        {
            rt_kprintf("init event failed.\n");
            return -1;
        }
    
        rt_thread_init(&thread1,
                       "thread1",
                       thread1_recv_event,
                       RT_NULL,
                       &thread1_stack[0],
                       sizeof(thread1_stack),
                       THREAD_PRIORITY - 1, THREAD_TIMESLICE);
        rt_thread_startup(&thread1);
    
        rt_thread_init(&thread2,
                       "thread2",
                       thread2_send_event,
                       RT_NULL,
                       &thread2_stack[0],
                       sizeof(thread2_stack),
                       THREAD_PRIORITY, THREAD_TIMESLICE);
        rt_thread_startup(&thread2);
    
        return 0;
    }
    
    /* 導出到 msh 命令列表中 */
    MSH_CMD_EXPORT(event_sample, event sample);
  • 運行結果:

  •  \ | /
    - RT -     Thread Operating System
     / | \     3.1.0 build Aug 24 2018
     2006 - 2018 Copyright by rt-thread team
    msh >event_sample
    thread2: send event3
    thread1: OR recv event 0x8
    thread1: delay 1s to prepare the second event
    msh >thread2: send event5
    thread2: send event3
    thread2 leave.
    thread1: AND recv event 0x28
    thread1 leave.

事件集使用場合

  •  事件集可使用於多種場合,它能夠在一定程度上替代信號量,用於線程間同步。一個線程或中斷服務例程發送一個事件給事件集對象,而后等待的線程被喚醒並對相應的事件進行處理。但是它與信號量不同的是,事件的發送操作在事件未清除前,是不可累計的,而信號量的釋放動作是累計的。
  • 事件的另一個特性是,接收線程可等待多種事件,即多個事件對應一個線程或多個線程。同時按照線程等待的參數,可選擇是 “邏輯或” 觸發還是 “邏輯與” 觸發。這個特性也是信號量等所不具備的,信號量只能識別單一的釋放動作,而不能同時等待多種類型的釋放。如下圖所示為多事件接收示意圖:

  • 一個事件集中包含 32 個事件,特定線程只等待、接收它關注的事件。可以是一個線程等待多個事件的到來(線程 1、2 均等待多個事件,事件間可以使用 “與” 或者 “或” 邏輯觸發線程),也可以是多個線程等待一個事件的到來(事件 25)。當有它們關注的事件發生時,線程將被喚醒並進行后續的處理動作。

參考

  • 《RT-Thread 編程指南》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM