1.MediaPipe為何提出
MediaPipe為移動、桌面/雲、web和物聯網設備構建世界級ML解決方案和應用程序。整個MediaPipe有以下四個特點:
- 端到端加速:內置的快速ML推理和處理框架甚至可以在普通硬件上加速
- 構建一次,部署在任何地方:統一的解決方案工作在Android, iOS,桌面/雲,網絡和物聯網
- 現成的解決方案:尖端的ML解決方案,展示了框架的全部功能
- 免費和開放源碼:Apache 2.0下的框架和解決方案,完全可擴展和可定制
2.MediaPipe框架的組件
2.1 計算單元(Calculator)
每個計算單元都是圖的一個節點,下文主要描述怎樣構建新的計算單元,怎樣初始化計算單元,怎樣執行計算單元的輸入/輸出流,時間戳和選項。一個計算單元可能不接受或接受多個輸入流,輸出零個或者多個輸出流。
(1) 計算單元基類
通過定義CalculatorBase
類的新子類、實現許多方法並向Mediapipe注冊新子類來創建計算單元。一個新的計算單元至少必須實現以下四個方法。
-
GetContract()
計算單元可以在GetContract()
中指定輸入和輸出的預期類型。當圖形(Graph)初始化時,框架調用一個靜態方法來驗證連接的輸入和輸出數據包的類型是否與此規范中的信息匹配。 -
Open()
圖啟動后,框架調用Open()
。輸入端數據包在此刻處於准備狀態。Open()
解釋節點配置操作,並准備計算單元的每個圖運行狀態。這個函數也可以將數據包寫到計算單元的輸出中。Open()
期間出現的錯誤可能會終止圖形運行。 -
Process()
對於有輸入的計算單元,只要至少有一個輸入流有可用的包,框架就會重復調用Process()
。框架默認情況下保證所有輸入都具有相同的時間戳。當啟用並行執行時,可以同時調用多個Process()
調用。如果Process()
期間發生錯誤,框架調用Close()
,圖(Graph)運行終止。 -
Close
在所有對Process()
的調用結束后或當所有輸入流關閉時,框架調用close()
。如果調用了Open()
並成功了,則始終調用此函數,即使由於錯誤而終止了圖形運行,也將始終調用此函數。在Close()
期間,任何輸入流都沒有可用的輸入,但它仍然可以訪問輸入端包,因此可以寫入輸出。在Close()
返回后,計算節點應該被認為是死節點。圖形完成運行后,計算單元對象將被銷毀。
下面是來自CalculatorBase.h
的代碼片段。
class CalculatorBase {
public:
...
// The subclasses of CalculatorBase must implement GetContract.
// ...
static ::MediaPipe::Status GetContract(CalculatorContract* cc);
// Open is called before any Process() calls, on a freshly constructed
// calculator. Subclasses may override this method to perform necessary
// setup, and possibly output Packets and/or set output streams' headers.
// ...
virtual ::MediaPipe::Status Open(CalculatorContext* cc) {
return ::MediaPipe::OkStatus();
}
// Processes the incoming inputs. May call the methods on cc to access
// inputs and produce outputs.
// ...
virtual ::MediaPipe::Status Process(CalculatorContext* cc) = 0;
// Is called if Open() was called and succeeded. Is called either
// immediately after processing is complete or after a graph run has ended
// (if an error occurred in the graph). ...
virtual ::MediaPipe::Status Close(CalculatorContext* cc) {
return ::MediaPipe::OkStatus();
}
...
};
(2) 計算單元的生命周期
在MediaPipe圖的初始化過程中,框架調用GetContract()
靜態方法來確定需要什么類型的包。該框架為每個圖形運行構造和銷毀整個計算單元(例如,一次視頻或一次圖像)。在圖形運行中保持不變的昂貴或大型對象應該作為輸入端包提供,這樣計算就不會在后續運行中重復。
初始化后,對於圖的每次運行,順序如下:
Open()
Process()
(重復的)Close()
框架調用Open()
來初始化計算單元。Open()
應該解釋所有選項,並設置計算單元的每個圖形運行狀態。Open()
可以獲取輸入端數據包並將數據包寫入計算單元輸出。如果合適的話,它應該調用SetOffset()
來減少輸入流的潛在的數據流緩沖。
對於有輸入的計算單元,只要至少有一個輸入有可用的包,框架就調用Process()
。該框架保證所有輸入都具有相同的時間戳,並且時間戳會隨着對Process()
的每次調用而增加,並且所有數據包都被傳遞。因此,在調用Process()
時,一些輸入可能沒有任何數據包。數據包丟失的輸入似乎會產生一個空數據包(沒有時間戳)。
在所有對Process()
的調用之后,框架都會調用Close()
。所有的輸入都已耗盡,但是Close()
可以訪問輸入端包並可以寫輸出。關閉返回后,計算單元被銷毀。
沒有輸入的計算單元稱為源。只要源計算單元返回Ok狀態,它就繼續調用Process()。源計算單元通過返回一個停止狀態(即MediaPipe::tool::StatusStop)來指示它已被耗盡。
(3) 識別輸入和輸出
計算單元的公共接口由一組輸入流和輸出流組成。在CalculatorGraphConfiguration
中,一些計算單元的輸出使用命名流連接到其他計算單元的輸入。流名稱通常為小寫,而輸入和輸出標記通常為大寫。在下面的示例中,使用名為video_stream
的流將標記名為VIDEO
的輸出連接到標記名為VIDEO_IN
的輸入。
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "INPUT:combined_input"
output_stream: "VIDEO:video_stream"
}
node {
calculator: "SomeVideoCalculator"
input_stream: "VIDEO_IN:video_stream"
output_stream: "VIDEO_OUT:processed_video"
}
輸入和輸出流可以通過索引編號、標簽名稱或標簽名稱和索引編號的組合來標識。在下面的示例中可以看到輸入和輸出標識符的一些示例。SomeAudioVideoCalculator
通過標簽識別其視頻輸出,通過標簽和索引的組合識別其音頻輸出。帶有標記VIDEO
的輸入連接到名為video_stream
的流。標記AUDIO和索引0和1的輸出連接到名為audio_left和audio_right的流。SomeAudioCalculator
僅通過索引(不需要標簽)識別它的音頻輸入。
# Graph describing calculator SomeAudioVideoCalculator
node {
calculator: "SomeAudioVideoCalculator"
input_stream: "combined_input"
output_stream: "VIDEO:video_stream"
output_stream: "AUDIO:0:audio_left"
output_stream: "AUDIO:1:audio_right"
}
node {
calculator: "SomeAudioCalculator"
input_stream: "audio_left"
input_stream: "audio_right"
output_stream: "audio_energy"
}
在計算單元實現中,輸入和輸出也通過標記名稱和索引號來標識。在下面的函數中輸入和輸出都有標識:
- 根據索引號:組合后的輸入流僅通過索引0來標識。
- 標簽名稱:通過標簽名稱“video”來標識視頻輸出流。
- 根據標簽名和索引號:輸出的音頻流由標簽名audio和索引號0和1的組合來識別。
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
public:
static ::mediapipe::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).SetAny();
// SetAny() is used to specify that whatever the type of the
// stream is, it's acceptable. This does not mean that any
// packet is acceptable. Packets in the stream still have a
// particular type. SetAny() has the same effect as explicitly
// setting the type to be the stream's type.
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
return ::mediapipe::OkStatus();
}
(4) 計算單元執行
在非源節點上調用Process()
必須返回::mediapipe::OkStatus()
來表示一切正常,或者返回任何其他狀態代碼來表示錯誤。如果一個非源計算單元返回tool::StatusStop()
,那么這表示這個圖被提前取消了。在這種情況下,所有源計算單元和圖形輸入流將被關閉(其余的包將通過圖形傳播)。圖中的源節點將繼續調用Process()
,只要它返回::mediapipe::OkStatus()
。為了表示不再生成數據,返回tool::StatusStop()
。任何其他狀態都表示發生了錯誤。Close()
返回::mediapipe::OkStatus()
表示成功。任何其他狀態都表示失敗。
下面是基本的Process()
函數。它使用Input()
方法(只有在計算單元只有一個輸入時才可以使用)來請求輸入數據。然后,它使用std::unique_ptr
分配輸出包所需的內存,並執行計算。完成后,它會在將指針添加到輸出流時釋放指針。
::util::Status MyCalculator::Process() {
const Matrix& input = Input()->Get<Matrix>();
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
// do your magic here....
// output->row(n) = ...
Output()->Add(output.release(), InputTimestamp());
return ::mediapipe::OkStatus();
}
(5) 計算單元Demo
本節討論PacketClonerCalculator
的實現,它執行一項相對簡單的工作,並在許多計算單元圖中使用。PacketClonerCalculator
只是根據需要生成其最近的輸入數據流的副本。
當到達的數據包的時間戳沒有完全對齊時,PacketClonerCalculator
非常有用。假設我們有一個房間,里面有麥克風、光傳感器和一個正在收集感官數據的攝像機。每個傳感器獨立工作,間歇地收集數據。設每個傳感器的輸出為:
- 傳聲器=響度(以房間內聲音的分貝為單位)
- 光傳感器=房間亮度(整數)
- 攝像機=房間RGB圖像幀(ImageFrame)
我們的簡單感知管道被設計來處理來自這3個傳感器的數據,這樣在任何時候當我們有來自相機的圖像幀數據,與最后收集的麥克風音量數據和光傳感器亮度數據同步。要用MediaPipe做到這一點,我們的感知管道有3個輸入流: - room_mic_signal——這個輸入流中的每個數據包都是整數數據,表示音頻在的房間中有多響,並帶有時間戳。
- room_lightening_sensor—這個輸入流中的每個數據包都是整數數據,表示照亮房間的亮度,並帶有時間戳
- room_video_tick_signal—該輸入流中的每個數據包都是視頻數據的圖像幀,表示從房間內的攝像機收集的視頻,並帶有時間戳。
下面是PacketClonerCalculator
的實現。您可以看到GetContract()
、Open()
和Process()
方法以及實例變量current_
,它包含最近的輸入數據包。
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
class PacketClonerCalculator : public CalculatorBase {
public:
static ::mediapipe::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
// cc->Inputs().NumEntries() returns the number of input streams
// for the PacketClonerCalculator
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
// cc->Inputs().Index(i) returns the input stream pointer by index
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return ::mediapipe::OkStatus();
}
::mediapipe::Status Open(CalculatorContext* cc) final {
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
// Sets the output stream of index i header to be the same as
// the header for the input stream of index i
}
}
return ::mediapipe::OkStatus();
}
::mediapipe::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output if the tick signal is non-empty.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
// Add a packet to output stream of index i a packet from inputstream i
// with timestamp common to all present inputs
//
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
// if current_[i], 1 packet buffer for input stream i is empty, we will set
// next allowed timestamp for input stream i to be current timestamp + 1
}
}
}
return ::mediapipe::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
通常,計算單元只有一個.cc
文件。不需要.h
,因為mediapipe使用注冊來讓它知道計算單元。定義計算單元類之后,使用宏調用REGISTER_CALCULATOR(calculator_class_name)
注冊它。
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
node {
calculator: "PacketClonerCalculator"
input_stream: "room_mic_signal"
input_stream: "room_lighting_sensor"
input_stream: "room_video_tick_signal"
output_stream: "cloned_room_mic_signal"
output_stream: "cloned_lighting_sensor"
}
下面的圖表顯示了PacketClonerCalculator
如何根據它的一系列輸入數據包(頂部)定義它的輸出數據包(底部)。
2.2 圖結構(Graphs)
(1) 圖配置
GraphConfig
是描述MediaPipe圖的拓撲和功能的規范。在規范中,圖中的節點表示特定計算單元的實例。節點的所有必要配置,如類型、輸入和輸出,都必須在規范中描述。節點的描述還可以包括幾個可選字段,比如同步中討論的特定於節點的選項、輸入策略和executor。
GraphConfig
還有其他幾個字段來配置全局圖形級設置,例如,圖執行器配置、線程數量和輸入流的最大隊列大小。有幾個圖級別的設置對於在不同平台(例如桌面和移動平台)上調優圖的性能非常有用。例如,在移動設備上,將一個重型模型推斷計算單元附加到一個單獨的執行器可以提高實時應用程序的性能,因為它支持線程局部化。
下面是一個簡單的GraphConfig
示例,其中我們有一系列的直通計算單元:
# This graph named main_pass_throughcals_nosubgraph.pbtxt contains 4
# passthrough calculators.
input_stream: "in"
node {
calculator: "PassThroughCalculator"
input_stream: "in"
output_stream: "out1"
}
node {
calculator: "PassThroughCalculator"
input_stream: "out1"
output_stream: "out2"
}
node {
calculator: "PassThroughCalculator"
input_stream: "out2"
output_stream: "out3"
}
node {
calculator: "PassThroughCalculator"
input_stream: "out3"
output_stream: "out4"
}
(2) 子圖
要將CalculatorGraphConfig
模塊化為子模塊並幫助重用感知解決方案,可以將MediaPipe圖定義為子圖。子圖的公共接口由一組輸入和輸出流組成,類似於計算單元的公共接口。然后可以將子圖像計算單元一樣包含在CalculatorGraphConfig
中。當從CalculatorGraphConfig
加載MediaPipe圖時,每個子圖節點都被相應的計算單元圖替換。因此,子圖的語義和性能與相應的計算單元圖是相同的。
下面是如何創建名為TwoPassThroughSubgraph
的子圖的示例。
- 定義子圖
# This subgraph is defined in two_pass_through_subgraph.pbtxt
# and is registered as "TwoPassThroughSubgraph"
type: "TwoPassThroughSubgraph"
input_stream: "out1"
output_stream: "out3"
node {
calculator: "PassThroughculator"
input_stream: "out1"
output_stream: "out2"
}
node {
calculator: "PassThroughculator"
input_stream: "out2"
output_stream: "out3"
}
子圖的公共接口包括:
- 圖輸入流
- 圖形輸出流
- 圖形輸入端包
- 圖形輸出端包
- 使用構建規則
mediapipe_simple_subgraph
注冊子圖。參數register_as
定義了新子圖的組件名。
# Small section of BUILD file for registering the "TwoPassThroughSubgraph"
# subgraph for use by main graph main_pass_throughcals.pbtxt
mediapipe_simple_subgraph(
name = "twopassthrough_subgraph",
graph = "twopassthrough_subgraph.pbtxt",
register_as = "TwoPassThroughSubgraph",
deps = [
"//mediapipe/calculators/core:pass_through_calculator",
"//mediapipe/framework:calculator_graph",
],
)
- 使用主圖中的子圖
# This main graph is defined in main_pass_throughcals.pbtxt
# using subgraph called "TwoPassThroughSubgraph"
input_stream: "in"
node {
calculator: "PassThroughCalculator"
input_stream: "in"
output_stream: "out1"
}
node {
calculator: "TwoPassThroughSubgraph"
input_stream: "out1"
output_stream: "out3"
}
node {
calculator: "PassThroughCalculator"
input_stream: "out3"
output_stream: "out4"
}
(3) 循環
默認情況下,MediaPipe要求計算單元圖是非循環的,並將圖中的循環視為錯誤。如果圖要具有循環,則需要在圖配置中對循環進行注釋。
請使用CalculatorGraphTest
。循環單元測試在mediapipe/framework/calculator_graph_test
。cc作為示例代碼。下面是測試中的循環圖。加法器輸出的和是整數源計算單元生成的整數的和。
-
背部邊緣注釋
我們要求每個循環中的一條邊被標注為后邊。這允許MediaPipe的拓撲排序在刪除所有后邊緣后工作。
通常有多種方法來選擇后邊緣。哪些邊被標記為后邊會影響哪些節點被認為是上游節點,哪些節點被認為是下游節點,進而影響MediaPipe分配給節點的優先級。
例如,CalculatorGraphTest.Cycle
測試將old_sum
邊標記為后邊,因此將延遲節點視為加法器節點的下游節點,並給予較高的優先級。或者,我們可以將延遲節點的輸入和標記為后邊,在這種情況下,延遲節點將被視為加法器節點的上游節點,並被給予較低的優先級。 -
初始化數據流
當整數源的第一個整數到達時,加法器計算單元可以運行,我們需要一個初始包,值為0,時間戳相同,在加法器的old_sum
輸入流上。這個初始包應該由Open()
方法中的延遲計算單元輸出。 -
循環延遲
每個循環都應該延遲前一個和輸出與下一個整數輸入的對齊。這也是由延遲節點完成的。因此延遲節點需要知道以下整數源計算單元的時間戳:
- 第一個輸出的時間戳。
- 連續輸出之間的時間戳增量。
我們計划添加一個替代的調度策略,它只關心包的順序,而忽略包的時間戳,這將消除這種不便。
-
當一個輸入流完成時,提前終止計算單元
默認情況下,當完成非源計算單元的所有輸入流時,MediaPipe調用其Close()
方法。在示例圖行中,我們希望在完成整數源后立即停止加法器節點。這是通過使用另一個輸入流處理程序EarlyCloseInputStreamHandler
配置加法器節點來完成的。 -
相關的源代碼
延遲計算單元
請注意Open()
中輸出初始數據包的代碼和Process()
中為輸入數據包添加(單位)延遲的代碼。如上所述,這個延遲節點假設它的輸出流與數據包時間戳為0,1,2,3,…的輸入流一起使用。
class UnitDelayCalculator : public Calculator {
public:
static ::util::Status FillExpectations(
const CalculatorOptions& extendable_options, PacketTypeSet* inputs,
PacketTypeSet* outputs, PacketTypeSet* input_side_packets) {
inputs->Index(0)->Set<int>("An integer.");
outputs->Index(0)->Set<int>("The input delayed by one time unit.");
return ::mediapipe::OkStatus();
}
::util::Status Open() final {
Output()->Add(new int(0), Timestamp(0));
return ::mediapipe::OkStatus();
}
::util::Status Process() final {
const Packet& packet = Input()->Value();
Output()->AddPacket(packet.At(packet.Timestamp().NextAllowedInStream()));
return ::mediapipe::OkStatus();
}
};
圖配置
請注意back_edge
注釋和替代的input_stream_handler
。
node {
calculator: 'GlobalCountSourceCalculator'
input_side_packet: 'global_counter'
output_stream: 'integers'
}
node {
calculator: 'IntAdderCalculator'
input_stream: 'integers'
input_stream: 'old_sum'
input_stream_info: {
tag_index: ':1' # 'old_sum'
back_edge: true
}
output_stream: 'sum'
input_stream_handler {
input_stream_handler: 'EarlyCloseInputStreamHandler'
}
}
node {
calculator: 'UnitDelayCalculator'
input_stream: 'sum'
output_stream: 'old_sum'
}
2.3 數據包(Packets)
每個計算單元都是圖形的一個節點。我們將描述如何創建新的計算單元、如何初始化計算單元、如何執行其計算、輸入和輸出流、時間戳和選項
數據包通常是用MediaPipe::Adopt()
(從package .h中)創建的。
// Create some data.
auto data = gtl::MakeUnique<MyDataClass>("constructor_argument");
// Create a packet to own the data.
Packet p = Adopt(data.release());
// Make a new packet with the same data and a different timestamp.
Packet p2 = p.At(Timestamp::PostStream());
通過packet::Get<T>()
訪問數據包內的數據
2.4 同步性(synchronization)
(1) 調度機制
MediaPipe圖中的數據處理發生在定義為CalculatorBase
子類的處理節點中。調度系統決定何時運行每個計算單元。
每個圖至少有一個調度程序隊列。每個調度器隊列只有一個執行器。節點被靜態地分配給一個隊列(因此分配給一個執行器)。默認情況下,有一個隊列,它的執行器是一個線程池,根據系統的能力有許多線程。
每個節點都有一個調度狀態,可以是未就緒、就緒或正在運行。准備函數確定節點是否准備運行。在圖形初始化時,每當節點結束運行,以及每當節點的輸入狀態發生變化時,都會調用此函數。
使用的就緒函數取決於節點的類型。沒有流輸入的節點稱為源節點;源節點總是准備運行,直到它們告訴框架它們沒有更多的數據要輸出,這時它們就會關閉。
如果非源節點有要處理的輸入,並且根據節點的輸入策略設置的條件(下面將討論),這些輸入形成了一個有效的輸入集,那么它們就准備好了。
大多數節點使用默認的輸入策略,但有些節點指定不同的輸入策略。
注意:因為更改輸入策略會更改計算單元代碼期望從其輸入得到的保證,所以通常不可能將計算單元與任意輸入策略混合匹配。因此,應該為使用特殊輸入策略的計算單元編寫它,並聲明它。
當一個節點准備就緒時,一個任務被添加到相應的調度程序隊列中,這是一個優先隊列。優先級函數目前是固定的,考慮到節點的靜態屬性及其在圖中的拓撲排序。例如,靠近圖的輸出端的節點具有更高的優先級,而源節點具有最低的優先級。
每個隊列由一個執行器提供服務,執行器負責通過調用計算單元的代碼實際運行任務。可提供和配置不同的執行器;這可以用來定制執行資源的使用,例如在低優先級線程上運行某些節點。
(2) 時間戳同步
MediaPipe圖形執行是分散的:沒有全局時鍾,不同的節點可以同時處理來自不同時間戳的數據。這允許通過流水線獲得更高的吞吐量。
然而,時間信息對於許多感知工作流是非常重要的。接收多個輸入流的節點通常需要以某種方式協調它們。例如,一個對象檢測器可以從一個幀中輸出一列邊界矩形,並且這個信息可以被輸入到一個渲染節點,該節點應該與原始幀一起處理它。
因此,MediaPipe框架的關鍵職責之一是為節點提供輸入同步。就框架機制而言,時間戳的主要作用是充當同步鍵。
此外,MediaPipe被設計為支持確定性操作,這在許多場景(測試、模擬、批處理等)中非常重要,同時允許圖在需要滿足實時約束的地方放松確定性。
同步和決定論這兩個目標是幾種設計選擇的基礎。值得注意的是,推入給定流的數據包必須具有單調遞增的時間戳:這不僅是對許多節點有用的假設,而且同步邏輯也依賴於此。每個流都有一個時間戳限制,這是流上新包允許的最低時間戳。當一個時間戳為T的數據包到達時,邊界自動推進到T+1,反映了單調要求。這允許框架確定沒有更多時間戳小於T的數據包會到達。
(3) 輸入策略
同步在每個節點上使用,節點指定的輸入策略本地處理。
由DefaultInputStreamHandler
定義的默認輸入策略提供了確定性的輸入同步,並提供了以下保證:
- 如果在多個輸入流上提供具有相同時間戳的數據包,那么無論它們的到達順序如何,它們總是一起被處理。
- 輸入集嚴格按照時間戳升序進行處理。
- 沒有數據包被丟棄,處理是完全確定的。
- 根據上述保證,節點可以盡快處理數據。
注意:這樣做的一個重要結果是,如果計算單元在輸出包時總是使用當前的輸入時間戳,那么輸出將必然服從單調遞增的時間戳要求。
警告:另一方面,不能保證輸入包總是對所有流可用。
為了解釋它是如何工作的,我們需要引入固定時間戳的定義。如果流中的時間戳低於時間戳的限制,我們就說它是穩定的。換句話說,一旦輸入在該時間戳處的狀態不可挽回地已知,流的時間戳就確定下來了:要么存在一個包,要么確定具有該時間戳的包不會到達。
注意:由於這個原因,MediaPipe還允許流生成器顯式地將時間戳的綁定比上一個包的綁定更進一步,即提供更緊密的綁定。這可以讓下游節點更快地確定它們的輸入。
如果時間戳在每個流上都已確定,那么它將跨多個流確定。而且,如果一個時間戳被解決了,就意味着之前所有的時間戳也被解決了。因此,已確定的時間戳可以按升序確定地處理。
根據這個定義,如果存在一個時間戳,該時間戳在所有輸入流中都得到了解決,並且至少在一個輸入流中包含了一個包,那么具有默認輸入策略的計算單元就准備好了。輸入策略將一個已解決時間戳的所有可用數據包作為單個輸入集提供給計算單元。
這種確定性行為的一個結果是,對於具有多個輸入流的節點來說,理論上可以無限制地等待時間戳的確定,同時可以無限制地緩沖數量的數據包。(考慮一個有兩個輸入流的節點,其中一個一直發送數據包,而另一個什么也不發送,並且不推進邊界。)
因此,我們還提供了定制的輸入策略:例如,將輸入分割到由SyncSetInputStreamHandler
定義的不同同步集中,或者完全避免同步並在輸入到達時立即處理由立即inputstreamhandler
定義的輸入。
(4) 流控制
有兩種主要的流量控制機制。當流上緩沖的數據包達到由CalculatorGraphConfig::max_queue_size
定義的(可配置的)限制時,反壓力機制會對上游節點的執行進行節流。此機制維護確定性行為,並包括一個死鎖避免系統,該系統在需要時放松配置的限制。
第二個系統由插入特殊節點組成,這些節點可以根據FlowLimiterCalculator
定義的實時約束(通常使用自定義輸入策略)來丟棄數據包。例如,一種常見的模式將流控制節點放置在子圖的輸入處,並帶有從最終輸出到流控制節點的環回連接。流控制節點因此能夠跟蹤在下游圖中有多少時間戳正在被處理,如果這個計數達到(可配置的)限制,就會丟棄數據包;由於數據包是在上游丟棄的,因此我們避免了部分處理時間戳然后在中間階段丟棄數據包所造成的工作浪費。
這種基於計算單元的方法使圖可以控制在哪里可以丟棄數據包,並允許根據資源約束靈活地適應和定制圖的行為。
2.5 GPU
(1) 概述
MediaPipe支持用於GPU計算和渲染的計算單元節點,並允許合並多個GPU節點,以及將它們與基於CPU的計算單元節點混合。移動平台上有幾個GPU APIs(如OpenGL ES、Metal和Vulkan)。MediaPipe並不嘗試提供單一的跨API GPU抽象。各個節點可以使用不同的API編寫,從而允許它們在需要時利用平台特定的特性。
GPU支持對於移動平台的良好性能至關重要,特別是對於實時視頻。MediaPipe使開發者能夠編寫支持GPU使用的GPU兼容計算單元:
- 設備上的實時處理,而不僅僅是批處理
- 視頻渲染和效果,不僅僅是分析
下面是MediaPipe中GPU支持的設計原則
- 基於gpu的計算單元應該能夠出現在圖的任何地方,而不一定用於屏幕上的渲染。
- 幀數據從一個基於gpu的計算單元傳輸到另一個計算單元應該是快速的,而且不會導致昂貴的復制操作。
- CPU和GPU之間的幀數據傳輸應該在平台允許的情況下盡可能高效。
- 因為不同的平台可能需要不同的技術來獲得最佳性能,所以API應該允許在幕后實現方式上的靈活性。
- 一個計算單元應該被允許在使用GPU的全部或部分操作的最大靈活性,如果必要的話,將它與CPU結合。
(2) OpenGL ES支持
MediaPipe在Android/Linux上支持OpenGL ES到3.2版本,在iOS上支持es3.0版本。此外,MediaPipe還支持iOS上的Metal。
運行機器學習推理計算單元和圖形需要OpenGL ES 3.1或更高版本(在Android/Linux系統上)。
MediaPipe允許圖在多個GL上下文中運行OpenGL。舉例來說,這可能是非常有用的在圖表結合較慢的GPU推理路徑(例如,在10幀/秒)和更快的GPU渲染路徑(如30 FPS):因為一個GL上下文對應於一個連續的命令隊列,這兩個任務使用相同的背景將會減少渲染的幀速率。
MediaPipe使用多個上下文解決的一個挑戰是跨它們進行通信的能力。一個示例場景是一個同時發送到呈現和推理路徑的輸入視頻,呈現需要訪問推理的最新輸出。
一個OpenGL上下文不能被多個線程同時訪問。此外,在某些Android設備上,在同一線程上切換活動GL上下文可能會很慢。因此,我們的方法是為每個上下文設置一個專用線程。每個線程發出GL命令,在其上下文上建立一個串行命令隊列,然后由GPU異步執行。
(3) GPU計算單元的生命周期
本節介紹了基於GlSimpleCalculator
基類派生的GPU計算單元處理方法的基本結構。GPU計算單元是一個例子。方法LuminanceCalculator::GlRender
是從GlSimpleCalculator::Process
中調用的。
// Converts RGB images into luminance images, still stored in RGB format.
// See GlSimpleCalculator for inputs, outputs and input side packets.
class LuminanceCalculator : public GlSimpleCalculator {
public:
::mediapipe::Status GlSetup() override;
::mediapipe::Status GlRender(const GlTexture& src,
const GlTexture& dst) override;
::mediapipe::Status GlTeardown() override;
private:
GLuint program_ = 0;
GLint frame_;
};
REGISTER_CALCULATOR(LuminanceCalculator);
::mediapipe::Status LuminanceCalculator::GlRender(const GlTexture& src,
const GlTexture& dst) {
static const GLfloat square_vertices[] = {
-1.0f, -1.0f, // bottom left
1.0f, -1.0f, // bottom right
-1.0f, 1.0f, // top left
1.0f, 1.0f, // top right
};
static const GLfloat texture_vertices[] = {
0.0f, 0.0f, // bottom left
1.0f, 0.0f, // bottom right
0.0f, 1.0f, // top left
1.0f, 1.0f, // top right
};
// program
glUseProgram(program_);
glUniform1i(frame_, 1);
// vertex storage
GLuint vbo[2];
glGenBuffers(2, vbo);
GLuint vao;
glGenVertexArrays(1, &vao);
glBindVertexArray(vao);
// vbo 0
glBindBuffer(GL_ARRAY_BUFFER, vbo[0]);
glBufferData(GL_ARRAY_BUFFER, 4 * 2 * sizeof(GLfloat), square_vertices,
GL_STATIC_DRAW);
glEnableVertexAttribArray(ATTRIB_VERTEX);
glVertexAttribPointer(ATTRIB_VERTEX, 2, GL_FLOAT, 0, 0, nullptr);
// vbo 1
glBindBuffer(GL_ARRAY_BUFFER, vbo[1]);
glBufferData(GL_ARRAY_BUFFER, 4 * 2 * sizeof(GLfloat), texture_vertices,
GL_STATIC_DRAW);
glEnableVertexAttribArray(ATTRIB_TEXTURE_POSITION);
glVertexAttribPointer(ATTRIB_TEXTURE_POSITION, 2, GL_FLOAT, 0, 0, nullptr);
// draw
glDrawArrays(GL_TRIANGLE_STRIP, 0, 4);
// cleanup
glDisableVertexAttribArray(ATTRIB_VERTEX);
glDisableVertexAttribArray(ATTRIB_TEXTURE_POSITION);
glBindBuffer(GL_ARRAY_BUFFER, 0);
glBindVertexArray(0);
glDeleteVertexArrays(1, &vao);
glDeleteBuffers(2, vbo);
return ::mediapipe::OkStatus();
}
以上設計原則導致MediaPipe GPU支持的設計選擇如下:
-
我們有一個GPU數據類型,稱為GpuBuffer,用於表示圖像數據,為GPU使用而優化。這種數據類型的確切內容是不透明的,並且是特定於平台的。
-
基於composition的低級API,任何想要使用GPU的計算單元都會創建並擁有一個GlCalculatorHelper類的實例。這個類提供了一個平台無關的API,用於管理OpenGL上下文,為輸入和輸出設置紋理,等等。
-
一個基於子類化的高級API,其中實現圖像過濾器的簡單計算單元子類來自GlSimpleCalculator,只需要覆蓋一對虛擬方法與它們的特定OpenGL代碼,而超類負責所有管道。
-
需要在所有基於gpu的計算單元之間共享的數據被作為外部輸入提供,它被實現為一個圖服務,並由GlCalculatorHelper類管理。
-
calculator-specific助手和一個共享圖服務讓我們極大的靈活性在管理GPU資源:我們可以有一個單獨的上下文/計算單元,共享一個上下文,分享一個鎖或其他同步,等等,所有的這些都是由輔助管理和隱藏的個人計算單元。
(4) GpuBuffer到圖像幀轉換器
我們提供了兩個計算單元稱為GpuBufferToImageFrameCalculator
和ImageFrameToGpuBufferCalculator
。這些計算單元在ImageFrame
和GpuBuffer
之間轉換,允許結合GPU和CPU計算單元的圖形的構造。iOS和Android都支持它們
如果可能,這些計算單元使用特定平台的功能來在CPU和GPU之間沒有復制的共享數據。
下圖顯示了移動應用程序中的數據流,該應用程序從攝像頭捕獲視頻,通過MediaPipe圖形運行視頻,並實時將輸出呈現在屏幕上。虛線表示哪些部分位於MediaPipe圖內部。這個應用程序使用OpenCV在CPU上運行一個精巧的邊緣檢測過濾器,並使用GPU將其覆蓋在原始視頻上。
從攝像機的視頻幀作為GpuBuffer數據包被饋入圖。輸入流由兩個計算單元並行地訪問。GpuBufferToImageFrameCalculator
將緩沖區轉換為ImageFrame
,然后通過灰度轉換器和canny過濾器(都基於OpenCV並在CPU上運行)發送,后者的輸出然后再次轉換為GpuBuffer
。一個多輸入GPU計算單元,GlOverlayCalculator,接受原始的GpuBuffer和從邊緣檢測器出來的作為輸入,並使用着色器覆蓋它們。然后使用回調計算單元將輸出發送回應用程序,應用程序使用OpenGL將圖像呈現到屏幕上。