項目介紹
ZLMediaKit是一套高性能的流媒體服務框架,目前支持rtmp/rtsp/hls/http-flv流媒體協議。該項目已支持linux、macos、windows、ios、android平台,支持的編碼格式包括H264、AAC、H265(僅rtsp支持H265);采用的模型是多線程IO多路復用非阻塞式編程(linux下采用epoll、其他平台采用select)。
該框架基於C++11開發,避免使用裸指針,減少內存拷貝,代碼精簡可靠,並發性能優異,在linux平台下,單一進程即可充分利用多核CPU的優勢;最大限度的榨干CPU、網卡性能;輕松達到萬兆網卡性能極限。同時也能在高性能的同時,做到極低延時,畫面秒開。
目前ZLMediaKit經過多次版本迭代,編程模型多次升級優化;已經趨於成熟穩定,也在各種生產環境得到了驗證,本文主要討論ZLMediaKit高性能實現原理以及項目特點。
網絡模型對比
不同於SRS的單線程多協程、node.js/redis的單線程、NGINX的多進程模型;ZLMediaKit采用的是單進程多線程模型。那么為什么ZLMediaKit要采用這樣的編程模型呢?
作為一個多年的C++服務器后台開發工程師,多年的工作經驗告訴我,作為一個服務器程序,對於穩定性要求極高;一個服務器可以性能差點,但是絕不能輕易core dump;服務中斷、重啟、異常,對於一個線上已運營項目來說結果是災難性的。那么我們該怎么確保服務器的穩定?目前有以下手段:
- 單線程模型
- 單線程+協程
- 單線程+多進程
- 多線程+鎖
- 棄用C/C++
采用單線程模型的優點是,服務器簡單可靠,不用考慮資源競爭互斥的問題,這樣可以比較容易做到高穩定性;采用此模型的典型代表項目有 redis、node.js。但是由於是單線程模型,所以弊端也比較明顯;那就是在多核cpu上不能充分利用多核CPU的算力,性能瓶頸主要在於CPU(大家應該有過在redis中執行keys *慢慢等待的經歷)。
單線程+協程的方案本質上與純單線程模型無區別,它們的區別主要編程風格上。純單線程模型使用的是非阻塞式處處回調方式實現高並發,這種模型會有所謂的回調地獄的問題,編程起來會比較麻煩。而單線程+協程的方案是簡化編程方式,采用自然的阻塞式編程風格,在協程庫內部管理任務調度,本質也是非阻塞的。但是協程庫涉及的比較底層,跟系統息息相關,所以跨平台不是很好做,而且設計實現一個協程庫門檻較高。SRS采用就是這編程模型,由於協程庫的限制,SRS不能在windows上運行。
為了解決上述單線程模型的問題,很多服務器采用單線程多進程的編程模型;在這種模型下,既有單線程模型的簡單可靠的特性,又能充分發揮多核CPU的性能,而且某個進程掛了也不會影響其他進程,像NGINX就是這種編程模型;但是這種模型也有其局限性。在這種模型下,會話間是相互隔離的,兩個會話可能運行在不同的進程上;這樣就導致了會話間通信的困難。比如說A用戶連接在服務器A進程上,B用戶連接在服務器B進程上;如果兩者之間要完成某種數據交互,那么會異常困難,這樣必須通過進程間通信來完成。而進程間通信代價和開銷比較大,編程起來也比較困難。但是如果會話間無需數據交互(例如http服務器),那么這種模型是特別適合的,所以NGINX作為http服務器也是非常成功的,但是如果是譬如即時聊天的那種需要會話間通信的服務,那么這種開發模型不是很適合。不過現在越來越多的服務都需要支持分布式集群部署,所以單線程多進程方案的缺陷越來越不明顯。
由於C/C++是種強類型靜態語言,異常處理簡單粗暴,動不動就core dump。C/C++的設計理念就是發現錯誤及早暴露,在某種意義上來說,崩潰也是種好事,因為這樣會引起你的重視,讓你能及早發現定位並解決問題,而不是把問題拖延到無法解決的時候再暴露給你。但是這么做對一般人來說,C/C++就不是很友好了,人類並不像機器那樣嚴謹,有點疏忽在所難免,況且有些小問題也無傷大雅,並不需要毀滅式的core dump來應對。而且C/C++的學習曲線異常艱難困苦,很多人好幾年也不得要領,所以很多人表示紛紛棄坑,轉投 go / erlang / node.js之類。
但是C/C++由於其性能優越性,以及歷史原因,在某些場景下是不二選擇,而且C/C++才是真正的跨平台語言;況且隨着智能指針的推出,內存管理不再是難題;而lambda語法的支持,讓程序上下文綁定不再困難。隨着C++新特性的支持,編譯器靜態反射機制的完善,現代C++編程愈發簡便快捷。ZLMediaKit采用的就是C++11新標准以及相關理念完成的高性能流媒體服務框架。
與上面其它編程模型不同,ZLMediaKit采用的是多線程開發模型;與傳統的多線程模型不同;ZLMediaKit采用了C++11的智能指針來做內存管理,在線程切換時可以完美的管理內存在多線程下共享以及其生命周期。同時互斥鎖的粒度消減至極致,幾乎可以忽略不計。所以采用多線程模型的ZLMediaKit性能損耗極低,每條線程的性能幾乎可以媲美單線程模型,同時也可以充分榨干CPU的每一核心性能。
網絡模型詳述
ZLMediaKit在啟動時會根據cpu核心數自動創建若干個epoll實例(非linux平台為select);這些epoll實例都會有一個線程來運行epoll_wait
函數來等待事件的觸發。
以ZLMediaKit的RTMP服務為例,在創建一個TcpServer
時,ZLMediaKit會把這個Tcp服務的監聽套接字加入到每一個epoll實例,這樣如果收到新的RTMP播放請求,那么多個epoll實例會在內核的調度下,自動選擇負載較輕的線程觸發accept事件,以下是代碼片段:
template <typename SessionType> void start(uint16_t port, const std::string& host = "0.0.0.0", uint32_t backlog = 1024) { start_l<SessionType>(port,host,backlog); //自動加入到所有epoll線程監聽 EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor){ EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor); if(poller == _poller || !poller){ return; } auto &serverRef = _clonedServer[poller.get()]; if(!serverRef){ //綁定epoll實例 serverRef = std::make_shared<TcpServer>(poller); } serverRef->cloneFrom(*this); }); } void cloneFrom(const TcpServer &that){ if(!that._socket){ throw std::invalid_argument("TcpServer::cloneFrom other with null socket!"); } _sessionMaker = that._sessionMaker; //克隆一個相同fd的Socket對象 _socket->cloneFromListenSocket(*(that._socket)); _timer = std::make_shared<Timer>(2, [this]()->bool { this->onManagerSession(); return true; },_poller); this->mINI::operator=(that); _cloned = true; }
服務器在收到accept事件后,會創建一個TcpSession
對象並綁定到該epoll實例(同時把與之對應的peer fd
加入到相關epoll監聽)。每一個Tcp連接都會對應一個TcpSession
對象,在之后客戶端與服務器的數據交互中,該TcpSession
對象處理一切與之相關的業務數據,並且該對象之后生命周期內的一切事件都會由該epoll線程觸發,這樣服務器的每個epoll線程都能均勻的分派到合理的客戶端數量。以下是服務器accept事件處理邏輯代碼片段:
// 接收到客戶端連接請求 virtual void onAcceptConnection(const Socket::Ptr & sock) { weak_ptr<TcpServer> weakSelf = shared_from_this(); //創建一個TcpSession;這里實現創建不同的服務會話實例 auto sessionHelper = _sessionMaker(weakSelf,sock); auto &session = sessionHelper->session(); //把本服務器的配置傳遞給TcpSession session->attachServer(*this); //TcpSession的唯一識別符,可以是guid之類的 auto sessionId = session->getIdentifier(); //記錄該TcpSession if(!SessionMap::Instance().add(sessionId,session)){ //有同名session,說明getIdentifier生成的標識符有問題 WarnL << "SessionMap::add failed:" << sessionId; return; } //SessionMap中沒有相關記錄,那么_sessionMap更不可能有相關記錄了; //所以_sessionMap::emplace肯定能成功 auto success = _sessionMap.emplace(sessionId, sessionHelper).second; assert(success == true); weak_ptr<TcpSession> weakSession(session); //會話接收數據事件 sock->setOnRead([weakSession](const Buffer::Ptr &buf, struct sockaddr *addr){ //獲取會話強引用 auto strongSession=weakSession.lock(); if(!strongSession) { //會話對象已釋放 return; } //TcpSession處理業務數據 strongSession->onRecv(buf); }); //會話接收到錯誤事件 sock->setOnErr([weakSelf,weakSession,sessionId](const SockException &err){ //在本函數作用域結束時移除會話對象 //目的是確保移除會話前執行其onError函數 //同時避免其onError函數拋異常時沒有移除會話對象 onceToken token(nullptr,[&](){ //移除掉會話 SessionMap::Instance().remove(sessionId); auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } //在TcpServer對應線程中移除map相關記錄 strongSelf->_poller->async([weakSelf,sessionId](){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ return; } strongSelf->_sessionMap.erase(sessionId); }); }); //獲取會話強應用 auto strongSession=weakSession.lock(); if(strongSession) { //觸發onError事件回調 strongSession->onError(err); } }); }
通過上訴描述,我們應該大概了解了ZLMediaKit的網絡模型,通過這樣的模型基本上能榨干CPU的算力,不過CPU算力如果使用不當 ,也可能白白浪費,使之做一些無用的事務,那么在ZLMediaKit中還有那些技術手段來提高性能呢?我們在下節展開論述。
關閉互斥鎖
上一節論述中,我們知道TcpSession
是ZLMediaKit中的關鍵元素,服務器大部分計算都在TcpSession內完成。一個TcpSession
由一個epoll實例掌管其生命周期,其他線程不得直接操作該TcpSession
對象(必須通過線程切換到對應的epoll線程來完成操作);所以從某種意義上來說TcpSeesion
是單線程模型的;所以ZLMediaKit對於TcpSession
所對應的網絡io操作是無互斥鎖保護的,ZLMediaKit作為服務器模式運行,基本上是無鎖的;這種情況下,鎖對性能的影響幾乎可以忽略不計。以下是ZLMediaKit關閉互斥鎖的代碼片段:
virtual Socket::Ptr onBeforeAcceptConnection(const EventPoller::Ptr &poller){ /** * 服務器模型socket是線程安全的,所以為了提高性能,關閉互斥鎖 * Socket構造函數第二個參數即為是否關閉互斥鎖 */ return std::make_shared<Socket>(poller,false); } //Socket對象的構造函數,第二個參數即為是否關閉互斥鎖 Socket::Socket(const EventPoller::Ptr &poller,bool enableMutex) : _mtx_sockFd(enableMutex), _mtx_bufferWaiting(enableMutex), _mtx_bufferSending(enableMutex) { _poller = poller; if(!_poller){ _poller = EventPollerPool::Instance().getPoller(); } _canSendSock = true; _readCB = [](const Buffer::Ptr &buf,struct sockaddr *) { WarnL << "Socket not set readCB"; }; _errCB = [](const SockException &err) { WarnL << "Socket not set errCB:" << err.what(); }; _acceptCB = [](Socket::Ptr &sock) { WarnL << "Socket not set acceptCB"; }; _flushCB = []() {return true;}; _beforeAcceptCB = [](const EventPoller::Ptr &poller){ return nullptr; }; } //MutexWrapper對象定義,可以選擇是否關閉互斥鎖 template <class Mtx = recursive_mutex> class MutexWrapper { public: MutexWrapper(bool enable){ _enable = enable; } ~MutexWrapper(){} inline void lock(){ if(_enable){ _mtx.lock(); } } inline void unlock(){ if(_enable){ _mtx.unlock(); } } private: bool _enable; Mtx _mtx; };
規避內存拷貝
傳統的多線程模型下,做數據轉發會存在線程切換的問題,為了確保線程安全,一般使用內存拷貝來規避該問題;而且對數據進行分包處理也很難做到不使用內存拷貝。但是流媒體這種業務邏輯,可能觀看同一個直播的用戶是海量的,如果每分發一次就做內存拷貝,那么開銷是十分可觀的,這將嚴重拖累服務器性能。
ZLMediaKit在做媒體數據轉發時,是不會做內存拷貝的,常規的C++多線程編程很難做到這一點,但是我們在C++11的加持下,利用引用計數,巧妙的解決了多線程內存生命周期管理的問題,以下是RTMP服務器做媒體數據分發規避內存拷貝的代碼片段:
void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const Buffer::Ptr &buf, uint32_t ui32TimeStamp, int iChunkId){ if (iChunkId < 2 || iChunkId > 63) { auto strErr = StrPrinter << "不支持發送該類型的塊流 ID:" << iChunkId << endl; throw std::runtime_error(strErr); } //是否有擴展時間戳 bool bExtStamp = ui32TimeStamp >= 0xFFFFFF; //rtmp頭 BufferRaw::Ptr bufferHeader = obtainBuffer(); bufferHeader->setCapacity(sizeof(RtmpHeader)); bufferHeader->setSize(sizeof(RtmpHeader)); //對rtmp頭賦值,如果使用整形賦值,在arm android上可能由於數據對齊導致總線錯誤的問題 RtmpHeader *header = (RtmpHeader*) bufferHeader->data(); header->flags = (iChunkId & 0x3f) | (0 << 6); header->typeId = ui8Type; set_be24(header->timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); set_be24(header->bodySize, buf->size()); set_le32(header->streamId, ui32StreamId); //發送rtmp頭 onSendRawData(bufferHeader); //擴展時間戳字段 BufferRaw::Ptr bufferExtStamp; if (bExtStamp) { //生成擴展時間戳 bufferExtStamp = obtainBuffer(); bufferExtStamp->setCapacity(4); bufferExtStamp->setSize(4); set_be32(bufferExtStamp->data(), ui32TimeStamp); } //生成一個字節的flag,標明是什么chunkId BufferRaw::Ptr bufferFlags = obtainBuffer(); bufferFlags->setCapacity(1); bufferFlags->setSize(1); bufferFlags->data()[0] = (iChunkId & 0x3f) | (3 << 6); size_t offset = 0; uint32_t totalSize = sizeof(RtmpHeader); while (offset < buf->size()) { if (offset) { //發送trunkId onSendRawData(bufferFlags); totalSize += 1; } if (bExtStamp) { //擴展時間戳 onSendRawData(bufferExtStamp); totalSize += 4; } size_t chunk = min(_iChunkLenOut, buf->size() - offset); //分發流媒體數據包,此處規避了內存拷貝 onSendRawData(std::make_shared<BufferPartial>(buf,offset,chunk)); totalSize += chunk; offset += chunk; } _ui32ByteSent += totalSize; if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) { _ui32LastSent = _ui32ByteSent; sendAcknowledgement(_ui32ByteSent); } } //BufferPartial對象用於rtmp包的chunk大小分片,規避內存拷貝 class BufferPartial : public Buffer { public: BufferPartial(const Buffer::Ptr &buffer,uint32_t offset,uint32_t size){ _buffer = buffer; _data = buffer->data() + offset; _size = size; } ~BufferPartial(){} char *data() const override { return _data; } uint32_t size() const override{ return _size; } private: Buffer::Ptr _buffer; char *_data; uint32_t _size; };
我們在發送RTP包時也是采用同樣的原理來避免內存拷貝。
使用對象循環池
內存開辟銷毀是全局互斥的,過多的new/delete 不僅降低程序性能,還會導致內存碎片。ZLMediaKit盡量使用循環池來避免這些問題,以下代碼時RTP包循環池使用代碼片段:
RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, unsigned int len, bool mark, uint32_t uiStamp) { uint16_t ui16RtpLen = len + 12; uint32_t ts = htonl((_ui32SampleRate / 1000) * uiStamp); uint16_t sq = htons(_ui16Sequence); uint32_t sc = htonl(_ui32Ssrc); //采用循環池來獲取rtp對象 auto rtppkt = ResourcePoolHelper<RtpPacket>::obtainObj(); unsigned char *pucRtp = rtppkt->payload; pucRtp[0] = '$'; pucRtp[1] = _ui8Interleaved; pucRtp[2] = ui16RtpLen >> 8; pucRtp[3] = ui16RtpLen & 0x00FF; pucRtp[4] = 0x80; pucRtp[5] = (mark << 7) | _ui8PlayloadType; memcpy(&pucRtp[6], &sq, 2); memcpy(&pucRtp[8], &ts, 4); //ssrc memcpy(&pucRtp[12], &sc, 4); //playload memcpy(&pucRtp[16], data, len); rtppkt->PT = _ui8PlayloadType; rtppkt->interleaved = _ui8Interleaved; rtppkt->mark = mark; rtppkt->length = len + 16; rtppkt->sequence = _ui16Sequence; rtppkt->timeStamp = uiStamp; rtppkt->ssrc = _ui32Ssrc; rtppkt->type = type; rtppkt->offset = 16; _ui16Sequence++; _ui32TimeStamp = uiStamp; return rtppkt; }
設置Socket相關標志
開啟TCP_NODELAY后可以提高服務器響應速度,對於一些對延時要求比較敏感的服務(比如ssh服務),開啟TCP_NODELAY標記比較重要。但是對於流媒體服務,由於數據是源源不斷並且量也比較大,所以關閉TCP_NODELAY可以減少ACK包數量,充分利用帶寬資源。
MSG_MORE是另外一個提高網絡吞吐量的標記;這個標記的作用是在發送數據時,服務器會緩存一定的數據然后再打包一次性發送出去;而像RTSP這種業務場景,MSG_MORE標記就顯得格外合適;因為RTP包一般都很小(小於MTU),通過MSG_MORE標記可以極大減少數據包個數。
ZLMediaKit在處理播放器時,握手期間是開啟TCP_NODELAY並且關閉MSG_MORE的,這樣做的目的是提高握手期間數據交互的延時,減少鏈接建立耗時,提高視頻打開速度。在握手成功后,ZLMediaKit會關閉TCP_NODELAY並打開MSG_MORE;這樣又能減少數據報文個數,提高網絡利用率。
批量數據發送
網絡編程中,大家應該都用過send/sendto/write函數,但是writev/sendmsg函數應該用的不多。ZLMediaKit采用sendmsg函數來做批量數據發送,這樣在網絡不是很好或者服務器負載比較高時,可以明顯減少系統調用(系統調用開銷比較大)次數,提高程序性能。以下是代碼片段:
int BufferList::send_l(int fd, int flags,bool udp) { int n; do { struct msghdr msg; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = &(_iovec[_iovec_off]); msg.msg_iovlen = _iovec.size() - _iovec_off; if(msg.msg_iovlen > IOV_MAX){ msg.msg_iovlen = IOV_MAX; } msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = flags; n = udp ? send_iovec(fd,&msg,flags) : sendmsg(fd,&msg,flags); } while (-1 == n && UV_EINTR == get_uv_error(true)); if(n >= _remainSize){ //全部寫完了 _iovec_off = _iovec.size(); _remainSize = 0; return n; } if(n > 0){ //部分發送成功 reOffset(n); return n; } //一個字節都未發送 return n; }
批量線程切換
多線程模型下,流媒體服務器在做媒體數據分發時,肯定要做線程切換。線程切換的目的一是確保線程安全,防止多條線程同時操作某個對象或資源;二是可以充分利用多核算力,防止單線程成為轉發性能瓶頸。ZLMediaKit在做媒體轉發時,也同樣使用到線程切換來實現多線程的數據分發。但是線程切換開銷也比較大,如果線程切換次數太多,將嚴重影響服務器性能。
現在我們假設一個場景:RTMP推流客戶端A推送一個直播到服務器,這個直播比較火爆,假設有同時10K個用戶正在觀看這個直播,那么我們在分發一個RTMP數據包時是否需要最多進行10K次線程切換然后再發送數據?雖然ZLMediaKit的線程切換比較輕量,但是這樣頻繁的線程切換也是扛不住的。
ZLMediaKit在處理這類問題時,采用批量線程切換來盡量減少線程切換次數。假如說這10K的用戶分布在32個cpu核心上,那么ZLMediaKit最多進行32次線程切換,這樣ZLMediaKit將大大減少線程切換次數,同時又能使用多線程來分發數據,大大提高網絡吞吐量,以下是批量線程切換代碼片段:
void emitRead(const T &in){ LOCK_GUARD(_mtx_map); for (auto &pr : _dispatcherMap) { auto second = pr.second; //批量線程切換 pr.first->async([second,in](){ second->emitRead(in); },false); } } //線程切換后再做遍歷 void emitRead(const T &in){ for (auto it = _readerMap.begin() ; it != _readerMap.end() ;) { auto reader = it->second.lock(); if(!reader){ it = _readerMap.erase(it); --_readerSize; onSizeChanged(); continue; } //觸發數據分發操作 reader->onRead(in); ++it; } }
采用右值引用拷貝
ZLMediaKit中也盡量使用右值引用拷貝來規避內存拷貝,這里就不展開論述。
其他特性
優化及時推流打開率
有些應用場景需要設備端開始推流,然后APP立即觀看的應用場景。傳統的rtmp服務器對此應用場景是未作任何優化的,如果APP播放請求在推流尚未建立之前到達,那么將導致APP播放失敗,這樣視頻打開成功率就會降低,用戶體驗很不好。
ZLMediaKit在針對該應用場景時,做了特別的優化;實現原理如下:
1、收到播放請求時,立即檢查是否已經存在的媒體源,如果存在返回播放成功,否則進入第2步。
2、監聽對應的媒體源注冊事件,同時添加播放超時定時器,並且不回復播放器然后返回。邏輯將進入第3步或第4步。
3、媒體源注冊成功,那么立即響應播放器播放成功,同時刪除播放超時定時器,並移除媒體注冊事件監聽。
4、超時定時器觸發,響應播放器播放失敗,同時刪除播放超時定時器,並移除媒體注冊事件監聽。
使用ZLMediaKit作為流媒體服務器,可以APP播放請求和設備端推流同時進行。
性能測試對比
目前對ZLMediaKit做了一些性能測試,查看地址:benchmark
在測試時發現,ZLMediaKit在負載比較低時,其單線程性能大概是SRS的50%,單條線程大概能支撐5K個播放器,導致這個性能差距的主要原因時由於采用本地輪回網絡,網絡狀況為理想,那么sendmsg批量發送將不起優化左右;而SRS使用了合並寫特性(就是緩存300毫秒左右的數據后一次性發送),可以減少系統調用次數;如果負載比較高,以及真實網絡環境下,ZLMediaKit單線程性能應該跟SRS差距更小,我們在測試報告中也能發現在客戶端比較多時,ZLMediaKit單線程線程性能有比較大的提升。
由於ZLMediaKit支持多線程,可以充分利用多核CPU的性能,在多核服務器上,CPU已經不再是性能瓶頸,為了減少直播延時,目前合並寫特性是默認關閉的,可以通過配置文件開啟。
項目地址
目前ZLMediaKit已經開源,地址為:ZLMediaKit