Reliable Multicast Programming (PGM)實際通用可靠多播協議,在某種程度上保證多播的可靠性。是IP上層協議,和TCP還有UDP同級,工作在傳輸層。
在組播傳輸視頻項目中,發現在網絡較差的時候,組播傳輸視頻性能下降迅猛,組播的視頻幾乎到了無法直視的地步,已經不是馬賽克什么的問題了,簡直就是一張臭抹布。
但是上面的要求是讓接收端達到1080p 16fps的播放效果,此時組播接收端的實時網絡速率只有50KB/s左右,這種情況下要從軟件上處理的話(因為路由器不好換),需要讓組播的丟包率降低才行,但是使用iperf測了下當時網絡的丟包率,能丟到80%,丟到他姥姥家?
但是此時帶寬利用率卻很低,趕緊換成udp單播試了一下,速度能上去,也不怎么花屏了,不清楚是不是確認機制的問題。
但是總不能說把組播換成單播,當接入的接收端變多的時候,不清楚單播效果會不會也變差。
這個時候發現了PGM,“可靠”多播協議,有不少基於PGM實現的庫,打算先用windows上的寫個demo出來。
想要使用PGM需要先在網絡適配器上安裝協議,安裝完成后會在屬性中出現可靠多播協議
然后就是開發了,官網文檔提供的demo很棒,copy下來幾乎就能跑起來。但是除了官網文檔,相關資料就比較少了,頭文件我還找了半天,環境上坑不少,記錄一下
wsrm.h
頭文件
首先是windows sdk,我試了一下如果是8.1的sdk的話,是找不到wsrm.h
頭文件的,我有裝10.0.17134.0,8.1還有10.0.15063.0三個版本的windows sdk,用everything找了一下這個頭文件,得到了下面圖示結果
8.1應該是沒有,剩下兩個版本均可以使用,更新的版本應該也行。
vs2017以上的話在visual studio installer里面修改多裝個sdk就行了
傳輸速度
PGM本身也有發送窗口的概念,如果使用默認設置,窗口小,發送速度非常慢,每秒最多只有70KB左右,這時候需要設置socket選項
RateKbitsPerSec
的單位是kilobits/s,是一個上限
RM_SEND_WINDOW send_window;
send_window.WindowSizeInBytes = 8000 * 1000;
send_window.WindowSizeInMSecs = 1;
send_window.RateKbitsPerSec = (send_window.WindowSizeInBytes/send_window.WindowSizeInMSecs)*8;
int rc = setsockopt(s, IPPROTO_RM, RM_RATE_WINDOW_SIZE, (char *)&send_window, sizeof(send_window));
if (rc == SOCKET_ERROR)
{
cout << "setsockopt(): RM_RATE_WINDOW_SIZE failed with error code " << WSAGetLastError() << endl;
}
RM_SEND_WINDOW
結構體就這么三個成員,第一個是每秒速度了,第二個是發送窗口的大小,第三個是窗口大小毫秒,其中windows會強制讓
RateKbitsPerSec/8 = WindowSizeInBytes * WindowSizeInMSecs
PS:WindowSzieInMSecs
的值需要調整,當WindowSizeInBytes=8000
並且WindowSzieInMSecs=1
時,發送端較大概率會阻塞,原因未知,可能是發包速度過快導致
真的可靠么?
在文章一開始的時候可靠被加上了雙引號,為的是表明這個協議並不是想象中的那么可靠。
發送窗口大小有限,如果需要恢復重傳的數據在發送窗口之外了,那數據就是不可恢復的,一般當發送端速率過快接收端接收速度明顯跟不上時,就會出現不可恢復現象。一旦出現不可恢復數據時,windows就會讓接收端的連接重置,此時就不能繼續接收。
源碼
因為找不到對應的頭文件讓我着實頭疼了很久,相關文檔少,還不告訴我頭文件是什么,這太不爽了,就好比讓你看着門后面有啥,就是不給你鑰匙。
pgm分為server端和client端,功能是發送文件,根據msdn的文檔編寫的
下面是server端代碼
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#define WIN32_LEAN_AND_MEAN
#include <iostream>
#include<winsock2.h>
#include<WS2tcpip.h> //ip_mreqͷ
#include <wsrm.h>
#include <stdio.h>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
int main() {
WSADATA WSAData;
WORD sockVersion = MAKEWORD(2, 2);
if (WSAStartup(sockVersion, &WSAData) != 0)
return 0;
FILE *fp;
fopen_s(&fp, "test.webm", "rb+");
SOCKET s;
SOCKADDR_IN salocal, sasession;
int dwSessionPort;
s = socket(AF_INET, SOCK_RDM, IPPROTO_RM);
salocal.sin_family = AF_INET;
salocal.sin_port = htons(0); // Port is ignored here
salocal.sin_addr.s_addr = htonl(INADDR_ANY);
bind(s, (SOCKADDR *)&salocal, sizeof(salocal));
//
// Set all relevant sender socket options here
//
//
// Now, connect <entity type="hellip"/>
// Setting the connection port (dwSessionPort) has relevance, and
// can be used to multiplex multiple sessions to the same
// multicast group address over different ports
//
dwSessionPort = 1234;
sasession.sin_family = AF_INET;
sasession.sin_port = htons(dwSessionPort);
sasession.sin_addr.s_addr = inet_addr("224.4.5.6");
RM_SEND_WINDOW send_window;
send_window.WindowSizeInBytes = 8000;
send_window.WindowSizeInMSecs = 1;
send_window.RateKbitsPerSec = (send_window.WindowSizeInBytes/send_window.WindowSizeInMSecs)*8;
int rc = setsockopt(s, IPPROTO_RM, RM_RATE_WINDOW_SIZE, (char *)&send_window, sizeof(send_window));
if (rc == SOCKET_ERROR)
{
cout << "setsockopt(): RM_RATE_WINDOW_SIZE failed with error code " << WSAGetLastError() << endl;
}
connect(s, (SOCKADDR *)&sasession, sizeof(sasession));
//
// We're now ready to send data!
//
char pSendBuffer[1400];
sockaddr_in serverAddr;
int iAddrlen = sizeof(serverAddr);
while (1) {
if (feof(fp))
break;
memset(pSendBuffer, 0, 1400);
int data_size = fread(pSendBuffer, 1, 1400, fp);
LONG error;
error = sendto(s, pSendBuffer, data_size, 0, (sockaddr*)&serverAddr,iAddrlen);
if (error == SOCKET_ERROR)
{
fprintf(stderr, "send() failed: Error = %d\n",
WSAGetLastError());
}
}
WSACleanup();
return 0;
}
下面是client端代碼
#include <iostream>
#include<winsock2.h>
#include<WS2tcpip.h> //ip_mreqͷ
#include <wsrm.h>
#include <stdio.h>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
int main() {
WSADATA WSAData;
WORD sockVersion = MAKEWORD(2, 2);
if (WSAStartup(sockVersion, &WSAData) != 0)
return 0;
SOCKET s,
sclient;
SOCKADDR_IN salocal,
sasession;
int sasessionsz, dwSessionPort;
FILE * fp;
fopen_s(&fp, "aaatest.webm", "wb+");
s = socket(AF_INET, SOCK_RDM, IPPROTO_RM);
//
// The bind port (dwSessionPort) specified should match that
// which the sender specified in the connect call
//
dwSessionPort = 1234;
salocal.sin_family = AF_INET;
salocal.sin_port = htons(dwSessionPort);
salocal.sin_addr.s_addr = inet_addr("224.4.5.6");
int receive_buf_size = 65536 * 10;
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char*)&receive_buf_size, sizeof(receive_buf_size)) < 0)
{
std::cout << "setsockopt():SO_RCVBUF failed with error code" << WSAGetLastError() << std::endl;
}
bind(s, (SOCKADDR *)&salocal, sizeof(salocal));
//
// Set all relevant receiver socket options here
//
listen(s, 10);
sasessionsz = sizeof(sasession);
sclient = accept(s, (SOCKADDR *)&sasession, &sasessionsz);
if (setsockopt(sclient, SOL_SOCKET, SO_RCVBUF, (char*)&receive_buf_size, sizeof(receive_buf_size)) < 0)
{
std::cout << "setsockopt():SO_RCVBUF failed with error code" << WSAGetLastError() << std::endl;
}
//
// accept will return the client socket and we are now ready
// to receive data on the new socket!
//
LONG BytesRead;
char pTestBuffer[1400];
sockaddr_in clientAddr;
int iAddrlen = sizeof(clientAddr);
while (1)
{
memset(pTestBuffer, 0, 1400);
cout << "start" << endl;
BytesRead = recvfrom(sclient, pTestBuffer, 1400, 0, (sockaddr*)&clientAddr, &iAddrlen);
cout << "end" << endl;
if (BytesRead == 0)
{
fprintf(stdout, "Session was terminated\n");
}
else if (BytesRead == -1)
{
std::cout << "no data?!" << std::endl;
}
if (BytesRead > 0)
{
fwrite(pTestBuffer, 1, BytesRead, fp);
std::cout << BytesRead << std::endl;
}
}
fclose(fp);
WSACleanup();
return 0;
}