Gstreamer 數據流線程(GstTask / GstTaskPool)分析


作者:fengcc 原創文章 轉載請注明出處


GStreamer 是一個基於流水線的多媒體框架,基於 GObject,以 C 語言寫成。
憑借 GStreamer,程序員可以很容易地創建各種多媒體功能組件,包括簡單的音頻回放,音頻和視頻播放,錄音,流媒體和音頻編輯。基於流水線設計,可以創建諸如視頻編輯器、流媒體廣播和媒體播放器等等的很多多媒體應用。


GstTask/GstTaskPool — streaming threads

Gstreamer 將GstElementGstPad中關於數據流處理的線程封裝成 GstTask,並提供gst_task_start(),gst_task_pause(),gst_task_stop()等接口,使數據流的處理更加方便。例如:GstPad通常會啟動一個GstTask從另一個 pad 上拉數據或者推數據到另一個 pad。

工作模式

下面根據它提供的幾個接口來詳細介紹GstTask內部的實現。


(1)創建任務

GstTask *
gst_task_new (GstTaskFunction func,
              gpointer user_data,
              GDestroyNotify notify);

創建一個任務,該任務稍后將會在新線程里重復調用func函數,以user_data 為參數。注意,此時還沒有創建或者啟動一個新的線程。

GstTaskgst_task_class_init() 函數中會調用gst_task_pool_get_default()函數,該函數代碼如下:

GstTaskPool *
gst_task_pool_get_default (void)
{
  static GstTaskPool *pool = NULL;

  if (g_once_init_enter (&pool)) { //整個程序周期只會進入一次
    GstTaskPool *_pool = gst_task_pool_new ();

    gst_task_pool_prepare (_pool, NULL);
    g_once_init_leave (&pool, _pool);
  }

  return gst_object_ref (pool);
}

它利用g_once_init_enter()/g_once_init_leave()操作來確保整個程序的運行周期中只會創建一個默認GstTaskPool結構,GstTaskPool結構是對 glib 線程池的封裝。之后再調用gst_task_pool_prepare()函數,該函數會調用GstTaskPooldefault_prepare()函數來創建一個默認的 glib 線程池,使用的是上面提到的g_thread_pool_new()接口。

傳遞給g_thread_pool_new()的第一個參數為default_func函數:

static void
default_func (TaskData * tdata, GstTaskPool * pool)
{
  GstTaskPoolFunction func;
  gpointer user_data;

  func = tdata->func;
  user_data = tdata->user_data;
  g_slice_free (TaskData, tdata);

  func (user_data);	//運行自定義的任務
}

它的做法很簡單,拆解傳進來的參數,調用相應的函數,從而讓線程池中的每個線程運行自定義的任務。

當我們調用gst_task_new()函數創建一個新的任務時,會觸發GstTaskgst_task_init()函數,該函數會將GstTask結構體中的GstTaskPool *pool變量指向剛才創建的默認的GstTaskPool。所以,整個程序中所有的GstTask 依賴的都是同一個GstTaskPool結構,即所有的GstTask線程都運行在同一個線程池中。


(2)啟動任務

gboolean
gst_task_start (GstTask *task);

或:

gboolean
gst_task_set_state (GstTask *task,
                    GstTaskState state);

對於GstTask的任務,有三種狀態:

  • GST_TASK_STARTED:任務已經啟動,正在相應的線程中運行。
  • GST_TASK_STOPPED:任務已經停止,此時還沒有啟動相應的線程,或者線程已經運行結束退出。
  • GST_TASK_PAUSED:任務暫停。任務對應的線程沒有退出,處於暫停狀態,其實就是讓線程阻塞在某個條件變量上。

gst_task_start()實際上就是調用gst_task_set_state (task, GST_TASK_STARTED),將任務設置為開始狀態。

gst_task_set_state()會先記錄任務的原始狀態為old,再將任務設置為新的狀態,然后根據old,執行相關的操作,關鍵代碼如下:

/* if the state changed, do our thing */
  old = GET_TASK_STATE (task);
  if (old != state) {
    SET_TASK_STATE (task, state);

    switch (old) {
      case GST_TASK_STOPPED:
        /* If the task already has a thread scheduled we don't have to do
         * anything. */
        if (G_UNLIKELY (!task->running) &&
            (!task->priv->scheduleable || (task->priv->should_schedule
                    && state == GST_TASK_STARTED)))
          res = start_task (task);
        break;
      case GST_TASK_PAUSED:
        /* when we are paused, signal to go to the new state */
        GST_TASK_SIGNAL (task);
        break;
      case GST_TASK_STARTED:
        /* if we were started, we'll go to the new state after the next
         * iteration. */
        break;
    }
  }
  • 如果舊狀態是GST_TASK_STOPPED,則新狀態肯定是 GST_TASK_STARTED,則調用start_task函數,該函數稍候會解釋。
  • 如果舊狀態是GST_TASK_PAUSED,則增加相應的條件變量,這樣,任務對應的線程就會結束在此條件變量的阻塞,從而完成狀態轉換。
  • 如果舊狀態是GST_TASK_STARTED,這個注釋的解釋沒有看懂,還要繼續琢磨一下,抱歉。

start_task()函數中,以gst_task_func()函數為參數調用gst_task_pool_push()gst_task_pool_push()是對default_push()函數的封裝。default_push()函數代碼如下:

static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func,
    gpointer user_data, GError ** error)
{
  TaskData *tdata;

  tdata = g_slice_new (TaskData);
  tdata->func = func;
  tdata->user_data = user_data;

  GST_OBJECT_LOCK (pool);
  if (pool->pool)
    g_thread_pool_push (pool->pool, tdata, error);
  else {
    g_slice_free (TaskData, tdata);
  }
  GST_OBJECT_UNLOCK (pool);

  return NULL;
}

funcuser_data封裝成TaskData結構體,作為參數調用上面提到的g_thread_pool_push()函數將任務插入到線程池的任務列表中。TaskData結構體的拆解在上面提到的default_func()函數中,線程池中的每個線程啟動時都會調用該函數拆解參數,然后線程便切換到gst_task_func()函數運行。

注意:這里的funcgst_task_func()函數,user_data是任務對應的GstTask結構體。而使用gst_task_new()創建任務時傳入的自定義函數和參數是分別保存在GstTasktask->functask->user_data成員變量中,將會在gst_task_func()中采用task->func (task->user_data)的方式調用。

最后,每個線程中運行的其實都是gst_task_func()函數,該函數關鍵代碼如下:

while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
    GST_OBJECT_LOCK (task);

    if (G_UNLIKELY (priv->scheduleable
            && GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
      GST_OBJECT_UNLOCK (task);
      break;
    }

    while (G_UNLIKELY (!priv->scheduleable
            && GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
      g_rec_mutex_unlock (lock);

      GST_TASK_SIGNAL (task);
      GST_INFO_OBJECT (task, "Task going to paused");
      GST_TASK_WAIT (task);
      GST_INFO_OBJECT (task, "Task resume from paused");
      GST_OBJECT_UNLOCK (task);
      /* locking order.. */
      g_rec_mutex_lock (lock);
      GST_OBJECT_LOCK (task);
    }

    if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) {
      GST_OBJECT_UNLOCK (task);
      break;
    } else {
      GST_OBJECT_UNLOCK (task);
    }

    // 調用用戶自定義的函數。
    task->func (task->user_data);

    if (priv->scheduleable)
      break;
  }

如果任務被設置成GST_TASK_STOPPED狀態,則退出循環,結束線程運行。若任務為暫停狀態,則在對應的條件變量上阻塞,否則,就一直循環調用用戶自定義函數處理數據流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM