使用 GStreamer appsrc 等插件實現視頻音頻混流,錄制和推流


目前在做的在線直播教室,需要將老師分享的屏幕和老師的聲音、學生的聲音錄制為一個視頻文件,以便學生上課后還可以再看回放。

直播服務我們采用的是騰訊的視頻服務,有現成的 SDK 可以用。但 SDK 自帶的錄制接口滿足不了我們的需求,考察了 ffmpeg 和 GStreamer 后,決定在項目中使用 GStreamer 來實現。

在開始編寫代碼以前,先用命令行進行測試,命令行如下:

gst-launch-1.0.exe -v --gst-debug-level=4 flvmux name=mux ! tee name=t ! queue ! filesink name=file location=test.flv \
t. ! queue ! rtmpsink location="rtmp://live.abc.com/live/........" \
adder name=mix ! queue ! audiorate ! audioconvert ! voaacenc ! mux.audio \
videotestsrc name=screen_src ! queue ! videorate ! x264enc ! mux.video \
audiotestsrc name=send_audio_src ! queue ! audiorate ! mix. \
audiotestsrc wave=5 name=receive_audio_src ! queue ! audiorate ! mix.

命令看起來有點復雜,但其實邏輯挺簡單的,看下面這個邏輯圖就比較容易理解了(點擊查看大圖)



上面的命令中的推流地址需要替換為你的推流地址,如果沒有,可以先把下面這部分內容去掉不推流

t. ! queue ! rtmpsink location="rtmp://live.abc.com/live/........" \

另外這個命令是在 Windows 中的 MINGW64 的 bash 環境里面運行的, 如果在 Windows 的 cmd 環境中運行,把每行最后的 \ 和換行去掉就可以了。

上面命令中用到了幾個關鍵的插件,分別解釋一下:

adder: 音頻混流,將兩路音頻混為一路

voaacenc:音頻編碼,將原始的音頻流編碼為 aac 格式

x264enc:視頻編碼,將原始的視頻流編碼為 h264 格式

flvmux:flv 組裝,將視頻和音頻組裝在一起

tee:分流器,將一路輸入變為兩路輸出,以分別進行后續的處理。一路保存為文件,一路進行推流

filesink:文件存儲,將輸入數據存儲到指定的文件中

rtmpsink:推流,將輸入數據推流到指定的視頻服務器

接下來進行代碼實現,我們的項目是 QT C++ 項目,目前僅在 Windows 平台使用

將 gstream 管道的初始化等放在 GStreamProcess 類中,代碼如下:

GStreamProcess.h

#pragma once

#include "stdafx.h"

#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
#include <gst/base/gstbaseparse.h>

typedef struct _AppSrcOption AppSrcOption;

struct _AppSrcOption
{
	_AppSrcOption()
		: pipeline(nullptr)
		, shareScreenAppsrc(nullptr)
		, sendAudioAppsrc(nullptr)
		, bus(nullptr)
		, gloop(nullptr)
		, width(0)
		, height(0)
	{}

	GstElement *pipeline;
	GstElement *shareScreenAppsrc;
	GstElement *sendAudioAppsrc;
	GstElement *receiveAudioAppsrc;
	GstElement *rtmp;
	GstBus *bus;
	GMainLoop *gloop;

	QString recordFileName;

	iLiveSucCallback sucCallback;
	iLiveErrCallback errCallback;
	void* callbackData;

	uint width;
	uint height;

	QString pushStreamUrl;
};

//int gstreamerInit(AppSrcOption *app, int argc, char *argv[]);


class GStreamProcess : public QThread
{
	Q_OBJECT

public:
	AppSrcOption* app;

protected:		
	void run() Q_DECL_OVERRIDE;

signals:
	void resultReady(const QString &s);
};

GStreamProcess.cpp

#include "stdafx.h"

GST_DEBUG_CATEGORY(appsrc_pipeline_debug);
#define GST_CAT_DEFAULT appsrc_pipeline_debug

static gboolean
bus_message(GstBus * bus, GstMessage * message, AppSrcOption * app)
{
	GST_DEBUG("got message %s",
		gst_message_type_get_name(GST_MESSAGE_TYPE(message)));

	switch (GST_MESSAGE_TYPE(message)) {
	case GST_MESSAGE_ERROR: {
		GError *err = NULL;
		gchar *dbg_info = NULL;

		gst_message_parse_error(message, &err, &dbg_info);

		gchar* elename = GST_OBJECT_NAME(message->src);

		g_printerr("ERROR from element %s: %s\n",
			elename, err->message);
		g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");

		app->errCallback(-90001, err->message, app->callbackData);

		g_error_free(err);
		g_free(dbg_info);
		g_main_loop_quit(app->gloop);
		break;
	}
	case GST_MESSAGE_EOS: {
		g_main_loop_quit(app->gloop);
		break;
	}
	default:
		break;
	}

	return TRUE;
}

void GStreamProcess::run()
{
	GError *error = NULL;

	int argc = 1;

	char *mock[1] = {"empty"};
	char **argv[1];
	*argv = mock;

	gst_init(&argc, argv);

	GST_DEBUG_CATEGORY_INIT(appsrc_pipeline_debug, "appsrc-pipeline", 0,
		"appsrc pipeline example");

	app->gloop = g_main_loop_new(NULL, TRUE);

	GstElement *pipeline = gst_parse_launch("flvmux name=mux ! queue ! tee name=t ! queue ! filesink name=file t. ! queue ! rtmpsink name=rtmp adder name=mix ! queue ! audiorate ! audioconvert ! voaacenc ! mux.audio appsrc name=screen_src ! queue ! videorate ! x264enc ! mux.video appsrc name=send_audio_src ! queue ! audiorate ! mix. appsrc wave=5 name=receive_audio_src ! queue ! audiorate ! mix.", NULL);
	g_assert(pipeline);

	app->bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
	g_assert(app->bus);

	/* add watch for messages */
	gst_bus_add_watch(app->bus, (GstBusFunc)bus_message, app);


	/* 設置 screen src 屬性 */

	app->shareScreenAppsrc = gst_bin_get_by_name(GST_BIN(pipeline), "screen_src");
	g_assert(app->shareScreenAppsrc);

	GstCaps *caps = gst_caps_new_simple("video/x-raw",
		"format", G_TYPE_STRING, "I420",
		"width", G_TYPE_INT, app->width,
		"height", G_TYPE_INT, app->height,
		"framerate", GST_TYPE_FRACTION, 15, 1,
		NULL);

	gst_app_src_set_caps(GST_APP_SRC(app->shareScreenAppsrc), caps);

	g_object_set(app->shareScreenAppsrc, "format", GST_FORMAT_TIME, NULL);
	g_object_set(app->shareScreenAppsrc, "is-live", TRUE, NULL);


	/* 設置 send audio src 屬性 */

	app->sendAudioAppsrc = gst_bin_get_by_name(GST_BIN(pipeline), "send_audio_src");
	g_assert(app->sendAudioAppsrc);

	caps = gst_caps_new_simple("audio/x-raw",
		"format", G_TYPE_STRING, "S16LE",
		"layout", G_TYPE_STRING, "interleaved",
		"channels", G_TYPE_INT, 2,
		"rate", G_TYPE_INT, 48000,
		NULL);

	gst_app_src_set_caps(GST_APP_SRC(app->sendAudioAppsrc), caps);
	g_object_set(app->sendAudioAppsrc, "format", GST_FORMAT_TIME, NULL);
	g_object_set(app->sendAudioAppsrc, "is-live", TRUE, NULL);


	/* 設置 receive audio src 屬性 */

	app->receiveAudioAppsrc = gst_bin_get_by_name(GST_BIN(pipeline), "receive_audio_src");
	g_assert(app->receiveAudioAppsrc);

	caps = gst_caps_new_simple("audio/x-raw",
		"format", G_TYPE_STRING, "S16LE",
		"layout", G_TYPE_STRING, "interleaved",
		"channels", G_TYPE_INT, 2,
		"rate", G_TYPE_INT, 48000,
		NULL);

	gst_app_src_set_caps(GST_APP_SRC(app->receiveAudioAppsrc), caps);
	g_object_set(app->receiveAudioAppsrc, "format", GST_FORMAT_TIME, NULL);
	g_object_set(app->receiveAudioAppsrc, "is-live", TRUE, NULL);


	/* 設置 filesink 屬性 */

	GstElement *filesink = gst_bin_get_by_name(GST_BIN(pipeline), "file");
	g_assert(filesink);

	g_object_set(G_OBJECT(filesink), "location", app->recordFileName.toStdString().c_str(), NULL);


	/* 設置 rtmp 屬性 */

	GstElement *rtmp = gst_bin_get_by_name(GST_BIN(pipeline), "rtmp");
	g_assert(rtmp);

	g_object_set(G_OBJECT(rtmp), "location", app->pushStreamUrl.toStdString().c_str(), NULL);

	/* go to playing */
	gst_element_set_state(pipeline, GST_STATE_PLAYING);

	//GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline_dot");

	app->pipeline = pipeline;

	app->sucCallback(app->callbackData);

	g_main_loop_run(app->gloop);

	GST_DEBUG("stopping");

	gst_element_set_state(app->pipeline, GST_STATE_NULL);

	gst_object_unref(app->bus);
	g_main_loop_unref(app->gloop);
}

上面代碼中,比較關鍵的地方是 appsrc 的 format 屬性需要設置為 GST_FORMAT_TIME,如果不設置的話,視頻和音頻會無法同步,就是下面這幾行代碼:

...
g_object_set(app->shareScreenAppsrc, "format", GST_FORMAT_TIME, NULL);
...
g_object_set(app->sendAudioAppsrc, "format", GST_FORMAT_TIME, NULL);
...
g_object_set(app->receiveAudioAppsrc, "format", GST_FORMAT_TIME, NULL);

然后在需要啟動錄制的地方開啟線程,啟動 GStreamer 處理線程(因為項目比較復雜,這里只截取部分)

...
m_pAppSrcOption = new AppSrcOption();
m_pAppSrcOption->recordFileName = filePath;
m_pAppSrcOption->pushStreamUrl = m_pushStreamUrl;

m_pAppSrcOption->callbackData = this;
m_pAppSrcOption->sucCallback = OnLocalRecordSuc;
m_pAppSrcOption->errCallback = OnLocalRecordErr;

m_pLocalRecordProcessThread = new GStreamProcess();
m_pLocalRecordProcessThread->app = m_pAppSrcOption;

connect(m_pLocalRecordProcessThread, &GStreamProcess::finished, m_pLocalRecordProcessThread, &QObject::deleteLater);
connect(m_pLocalRecordProcessThread, &GStreamProcess::finished, this, &MainForm::OnLocalRecordClose);

m_pLocalRecordProcessThread->start();

m_pFillBlankAudioTimer->start(2000);
...

接下來注入視頻幀數據。 這個項目在分享屏幕時,每一幀的視頻數據會回調指定方法。在回調方法中,我們將數據傳給管道中的 shareScreenAppsrc

void MainForm::localVideoHook(const LiveVideoFrame* video_frame)
{
	if (m_pAppSrcOption && m_pAppSrcOption->pipeline)
	{
		GstBuffer *buffer;
		guint8 *ptr;
		ptr = (guint8 *)g_malloc(video_frame->dataSize * sizeof(uint8));
		if (NULL == ptr)
		{
			qDebug("OnLocalVideo::malloc failed!");
		}
		else
		{
			memcpy(ptr, video_frame->data, video_frame->dataSize);
			buffer = gst_buffer_new_wrapped((void*)ptr, video_frame->dataSize);

			//設置時間戳
			GST_BUFFER_PTS(buffer) = gst_clock_get_time(m_pAppSrcOption->pipeline->clock) - m_pAppSrcOption->pipeline->base_time;

			GstFlowReturn ret;
			//注入視頻幀數據
			g_signal_emit_by_name(m_pAppSrcOption->shareScreenAppsrc, "push-buffer", buffer, &ret);

			gst_buffer_unref(buffer);
		}
	}
}

上面代碼中,設置時間戳的代碼非常關鍵,如果沒有時間戳,會導致管道中的 videorate 由於缺少時間戳信息而失敗。
這里的時間戳取的是:當前管道時間 - 管道啟動時的時間

然后以類似的方式注入音頻幀數據,音頻幀有兩路,一路為老師的聲音,一路為學生的聲音。
老師的聲音:

void MainForm::sendAudioHook(const iLiveAudioFrame* audio_frame)
{

	if (m_pAppSrcOption && m_pAppSrcOption->pipeline)
	{
		GstBuffer *buffer;

		guint8 *ptr;
		ptr = (guint8 *)g_malloc(audio_frame->dataSize * sizeof(uint8));
		if (NULL == ptr)
		{
			qDebug("OnSendAudioCallback::malloc failed!");
		}
		else
		{
			memcpy(ptr, audio_frame->data, audio_frame->dataSize);
			buffer = gst_buffer_new_wrapped((void*)ptr, audio_frame->dataSize);

			GstClockTime pts = gst_clock_get_time(m_pAppSrcOption->pipeline->clock) - m_pAppSrcOption->pipeline->base_time;
			GST_BUFFER_PTS(buffer) = pts;
			m_lastWriteSendAudioTime = pts;

			GST_DEBUG("feed buffer");

			GstFlowReturn ret;
			g_signal_emit_by_name(m_pAppSrcOption->sendAudioAppsrc, "push-buffer", buffer, &ret);

			gst_buffer_unref(buffer);
		}
	}
}

學生的聲音:

void MainForm::receiveAudioHook(const iLiveAudioFrame* audio_frame)
{

	if (m_pAppSrcOption && m_pAppSrcOption->pipeline)
	{
		GstBuffer *buffer;

		guint8 *ptr;
		ptr = (guint8 *)g_malloc(audio_frame->dataSize * sizeof(uint8));
		if (NULL == ptr)
		{
			qDebug("receiveAudioHook::malloc failed!");
		}
		else
		{
			memcpy(ptr, audio_frame->data, audio_frame->dataSize);
			buffer = gst_buffer_new_wrapped((void*)ptr, audio_frame->dataSize);

			GstClockTime pts = gst_clock_get_time(m_pAppSrcOption->pipeline->clock) - m_pAppSrcOption->pipeline->base_time;
			GST_BUFFER_PTS(buffer) = pts;
			m_lastWriteReceiveAudioTime = pts;

			GST_DEBUG("feed buffer");

			GstFlowReturn ret;
			g_signal_emit_by_name(m_pAppSrcOption->receiveAudioAppsrc, "push-buffer", buffer, &ret);

			gst_buffer_unref(buffer);
		}
	}
}

在項目中還有一個特殊的地方,因為聲音並不是一直有數據的,如果學生沒有連麥或老師沒有開啟麥克風,是沒有音頻幀數據回調的,這會導致音頻混流時因為缺少數據而一直等待,造成阻塞。

這里采取了一個變通的方法,就是設置一個定時器,每隔 2 秒注入一幀空白數據。上面代碼中的 m_pFillBlankAudioTimer->start(2000); 就是用於啟動這個定時器的。下面是注入空白幀的代碼:

2018-03-15 updated: 這里每2秒注入一個空白幀會導致錄制下來的視頻出現聲音突突聲。后來改為增加了一個變量,在收到音頻信號時記錄上次插入音頻幀的時間戳。每隔2秒打算插入空白幀時,檢查時間戳距現在大於 5 秒才插入,否則不必插入。

//填充音頻無信號時的空白聲音,音頻斷流的話,會導致 adder 混流 block
void MainForm::OnFillBlankAudioTimer()
{
	if (m_pAppSrcOption && m_pAppSrcOption->pipeline && m_pAppSrcOption->receiveAudioAppsrc)
	{
		GstClockTime pts = gst_clock_get_time(m_pAppSrcOption->pipeline->clock) - m_pAppSrcOption->pipeline->base_time;

		if (GST_TIME_AS_SECONDS(pts - m_lastWriteReceiveAudioTime) > 5)
		{
			GstBuffer *buffer;
			guint size;
			GstFlowReturn ret;

			size = 3840;

			buffer = gst_buffer_new_allocate(NULL, size, NULL);

			//全部填入0x0
			gst_buffer_memset(buffer, 0, 0x0, size);

			GST_BUFFER_PTS(buffer) = pts;

			g_signal_emit_by_name(m_pAppSrcOption->receiveAudioAppsrc, "push-buffer", buffer, &ret);
			gst_buffer_unref(buffer);
		}
	}

	if (m_pAppSrcOption && m_pAppSrcOption->pipeline && m_pAppSrcOption->sendAudioAppsrc)
	{
		GstClockTime pts = gst_clock_get_time(m_pAppSrcOption->pipeline->clock) - m_pAppSrcOption->pipeline->base_time;

		if (GST_TIME_AS_SECONDS(pts - m_lastWriteSendAudioTime) > 5)
		{
			GstBuffer *buffer;
			guint size;
			GstFlowReturn ret;

			size = 3840;

			buffer = gst_buffer_new_allocate(NULL, size, NULL);

			//全部填入0x0
			gst_buffer_memset(buffer, 0, 0x0, size);

			GST_BUFFER_PTS(buffer) = pts;

			g_signal_emit_by_name(m_pAppSrcOption->sendAudioAppsrc, "push-buffer", buffer, &ret);
			gst_buffer_unref(buffer);
		}
	}
}

當需要結束混流和錄制時,向管道中的各個 appsrc 發送 end-of-stream 消息,管道在處理完所有數據后,就會正常結束,關閉退出。

void MainForm::onBtnStopPushStream()
{
	QMessageBox::StandardButton ret = QMessageBox::question(this, FromBits("確認"), FromBits("是否要停止視頻錄制?(多次錄制會產生多個視頻文件,會影響回放的體驗,應盡量避免多次錄制)"));

	if (ret == QMessageBox::Yes)
	{
		stopPushStream();

		if (m_pAppSrcOption && m_pAppSrcOption->pipeline)
		{
			GstFlowReturn ret;
			g_signal_emit_by_name(m_pAppSrcOption->shareScreenAppsrc, "end-of-stream", &ret);
			g_signal_emit_by_name(m_pAppSrcOption->sendAudioAppsrc, "end-of-stream", &ret);
			g_signal_emit_by_name(m_pAppSrcOption->receiveAudioAppsrc, "end-of-stream", &ret);

			m_pFillBlankAudioTimer->stop();
		}

		m_pushStreamStatus = E_ChangingPushStream;
		setStatus(m_status);
	}
} 

之前我們在啟動線程時,用下面這句代碼注冊了事件,當線程結束時會調用 OnLocalRecordClose 方法,可以在這個方法中更改 UI 控件的狀態和釋放資源

...
connect(m_pLocalRecordProcessThread, &GStreamProcess::finished, this, &MainForm::OnLocalRecordClose);
...

因為第一次使用 GStreamer 進行開發,走了很多彎路,踩了很多坑。好在最后還是完成了需要的功能,直播錄制和推流的效果還是不錯的。

但這個方案還存在一個問題,就是rtmp 推流時如果失敗,會導致整個管道出錯停止,這個還需要設法解決。

開發過程中參考了很多資料,比較有用的是下面這幾個:

  • appsrc 的 demo 代碼

https://gist.github.com/nzjrs/725122/16ceee88aafae389bab207818e0661328921e1ab (需要翻牆)

http://blog.csdn.net/u010312436/article/details/53610599

https://gstreamer.freedesktop.org/documentation/application-development/advanced/pipeline-manipulation.html


  • GStreamer 時鍾機制

https://gstreamer.freedesktop.org/documentation/application-development/advanced/clocks.html


  • GStreamer 寫日志和生成管道邏輯圖的方法

https://gstreamer.freedesktop.org/documentation/tutorials/basic/debugging-tools.html

https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer/html/gst-running.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM