作者:fengcc 原創文章 轉載請注明出處
GStreamer 是一個基於流水線的多媒體框架,基於 GObject,以 C 語言寫成。
憑借 GStreamer,程序員可以很容易地創建各種多媒體功能組件,包括簡單的音頻回放,音頻和視頻播放,錄音,流媒體和音頻編輯。基於流水線設計,可以創建諸如視頻編輯器、流媒體廣播和媒體播放器等等的很多多媒體應用。
GstTask/GstTaskPool — streaming threads
Gstreamer 將GstElement
和 GstPad
中關於數據流處理的線程封裝成 GstTask
,並提供gst_task_start()
,gst_task_pause()
,gst_task_stop()
等接口,使數據流的處理更加方便。例如:GstPad
通常會啟動一個GstTask
從另一個 pad 上拉數據或者推數據到另一個 pad。
下面根據它提供的幾個接口來詳細介紹GstTask
內部的實現。
(1)創建任務
GstTask *
gst_task_new (GstTaskFunction func,
gpointer user_data,
GDestroyNotify notify);
創建一個任務,該任務稍后將會在新線程里重復調用func
函數,以user_data
為參數。注意,此時還沒有創建或者啟動一個新的線程。
在GstTask
的 gst_task_class_init()
函數中會調用gst_task_pool_get_default()
函數,該函數代碼如下:
GstTaskPool *
gst_task_pool_get_default (void)
{
static GstTaskPool *pool = NULL;
if (g_once_init_enter (&pool)) { //整個程序周期只會進入一次
GstTaskPool *_pool = gst_task_pool_new ();
gst_task_pool_prepare (_pool, NULL);
g_once_init_leave (&pool, _pool);
}
return gst_object_ref (pool);
}
它利用g_once_init_enter()/g_once_init_leave()
操作來確保整個程序的運行周期中只會創建一個默認的GstTaskPool
結構,GstTaskPool
結構是對 glib 線程池的封裝。之后再調用gst_task_pool_prepare()
函數,該函數會調用GstTaskPool
的default_prepare()
函數來創建一個默認的 glib 線程池,使用的是上面提到的g_thread_pool_new()
接口。
傳遞給g_thread_pool_new()
的第一個參數為default_func
函數:
static void
default_func (TaskData * tdata, GstTaskPool * pool)
{
GstTaskPoolFunction func;
gpointer user_data;
func = tdata->func;
user_data = tdata->user_data;
g_slice_free (TaskData, tdata);
func (user_data); //運行自定義的任務
}
它的做法很簡單,拆解傳進來的參數,調用相應的函數,從而讓線程池中的每個線程運行自定義的任務。
當我們調用gst_task_new()
函數創建一個新的任務時,會觸發GstTask
的gst_task_init()
函數,該函數會將GstTask
結構體中的GstTaskPool *pool
變量指向剛才創建的默認的GstTaskPool
。所以,整個程序中所有的GstTask
依賴的都是同一個GstTaskPool
結構,即所有的GstTask
線程都運行在同一個線程池中。
(2)啟動任務
gboolean
gst_task_start (GstTask *task);
或:
gboolean
gst_task_set_state (GstTask *task,
GstTaskState state);
對於GstTask
的任務,有三種狀態:
- GST_TASK_STARTED:任務已經啟動,正在相應的線程中運行。
- GST_TASK_STOPPED:任務已經停止,此時還沒有啟動相應的線程,或者線程已經運行結束退出。
- GST_TASK_PAUSED:任務暫停。任務對應的線程沒有退出,處於暫停狀態,其實就是讓線程阻塞在某個條件變量上。
gst_task_start()
實際上就是調用gst_task_set_state (task, GST_TASK_STARTED)
,將任務設置為開始狀態。
gst_task_set_state()
會先記錄任務的原始狀態為old
,再將任務設置為新的狀態,然后根據old
,執行相關的操作,關鍵代碼如下:
/* if the state changed, do our thing */
old = GET_TASK_STATE (task);
if (old != state) {
SET_TASK_STATE (task, state);
switch (old) {
case GST_TASK_STOPPED:
/* If the task already has a thread scheduled we don't have to do
* anything. */
if (G_UNLIKELY (!task->running) &&
(!task->priv->scheduleable || (task->priv->should_schedule
&& state == GST_TASK_STARTED)))
res = start_task (task);
break;
case GST_TASK_PAUSED:
/* when we are paused, signal to go to the new state */
GST_TASK_SIGNAL (task);
break;
case GST_TASK_STARTED:
/* if we were started, we'll go to the new state after the next
* iteration. */
break;
}
}
- 如果舊狀態是
GST_TASK_STOPPED
,則新狀態肯定是GST_TASK_STARTED
,則調用start_task
函數,該函數稍候會解釋。 - 如果舊狀態是
GST_TASK_PAUSED
,則增加相應的條件變量,這樣,任務對應的線程就會結束在此條件變量的阻塞,從而完成狀態轉換。 - 如果舊狀態是
GST_TASK_STARTED
,這個注釋的解釋沒有看懂,還要繼續琢磨一下,抱歉。
在start_task()
函數中,以gst_task_func()
函數為參數調用gst_task_pool_push()
,gst_task_pool_push()
是對default_push()
函數的封裝。default_push()
函數代碼如下:
static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func,
gpointer user_data, GError ** error)
{
TaskData *tdata;
tdata = g_slice_new (TaskData);
tdata->func = func;
tdata->user_data = user_data;
GST_OBJECT_LOCK (pool);
if (pool->pool)
g_thread_pool_push (pool->pool, tdata, error);
else {
g_slice_free (TaskData, tdata);
}
GST_OBJECT_UNLOCK (pool);
return NULL;
}
將func
和user_data
封裝成TaskData
結構體,作為參數調用上面提到的g_thread_pool_push()
函數將任務插入到線程池的任務列表中。TaskData
結構體的拆解在上面提到的default_func()
函數中,線程池中的每個線程啟動時都會調用該函數拆解參數,然后線程便切換到gst_task_func()
函數運行。
注意:這里的func
是gst_task_func()
函數,user_data
是任務對應的GstTask
結構體。而使用gst_task_new()
創建任務時傳入的自定義函數和參數是分別保存在GstTask
的task->func
和task->user_data
成員變量中,將會在gst_task_func()
中采用task->func (task->user_data)
的方式調用。
最后,每個線程中運行的其實都是gst_task_func()
函數,該函數關鍵代碼如下:
while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
GST_OBJECT_LOCK (task);
if (G_UNLIKELY (priv->scheduleable
&& GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
GST_OBJECT_UNLOCK (task);
break;
}
while (G_UNLIKELY (!priv->scheduleable
&& GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
g_rec_mutex_unlock (lock);
GST_TASK_SIGNAL (task);
GST_INFO_OBJECT (task, "Task going to paused");
GST_TASK_WAIT (task);
GST_INFO_OBJECT (task, "Task resume from paused");
GST_OBJECT_UNLOCK (task);
/* locking order.. */
g_rec_mutex_lock (lock);
GST_OBJECT_LOCK (task);
}
if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) {
GST_OBJECT_UNLOCK (task);
break;
} else {
GST_OBJECT_UNLOCK (task);
}
// 調用用戶自定義的函數。
task->func (task->user_data);
if (priv->scheduleable)
break;
}
如果任務被設置成GST_TASK_STOPPED
狀態,則退出循環,結束線程運行。若任務為暫停狀態,則在對應的條件變量上阻塞,否則,就一直循環調用用戶自定義函數處理數據流。