Reliable Multicast Programming(PGM)協議


Reliable Multicast Programming (PGM)實際通用可靠多播協議,在某種程度上保證多播的可靠性。是IP上層協議,和TCP還有UDP同級,工作在傳輸層。

在組播傳輸視頻項目中,發現在網絡較差的時候,組播傳輸視頻性能下降迅猛,組播的視頻幾乎到了無法直視的地步,已經不是馬賽克什么的問題了,簡直就是一張臭抹布。

但是上面的要求是讓接收端達到1080p 16fps的播放效果,此時組播接收端的實時網絡速率只有50KB/s左右,這種情況下要從軟件上處理的話(因為路由器不好換),需要讓組播的丟包率降低才行,但是使用iperf測了下當時網絡的丟包率,能丟到80%,丟到他姥姥家?

但是此時帶寬利用率卻很低,趕緊換成udp單播試了一下,速度能上去,也不怎么花屏了,不清楚是不是確認機制的問題。
但是總不能說把組播換成單播,當接入的接收端變多的時候,不清楚單播效果會不會也變差。

這個時候發現了PGM,“可靠”多播協議,有不少基於PGM實現的庫,打算先用windows上的寫個demo出來。

想要使用PGM需要先在網絡適配器上安裝協議,安裝完成后會在屬性中出現可靠多播協議

然后就是開發了,官網文檔提供的demo很棒,copy下來幾乎就能跑起來。但是除了官網文檔,相關資料就比較少了,頭文件我還找了半天,環境上坑不少,記錄一下

wsrm.h頭文件

首先是windows sdk,我試了一下如果是8.1的sdk的話,是找不到wsrm.h頭文件的,我有裝10.0.17134.0,8.1還有10.0.15063.0三個版本的windows sdk,用everything找了一下這個頭文件,得到了下面圖示結果

8.1應該是沒有,剩下兩個版本均可以使用,更新的版本應該也行。

vs2017以上的話在visual studio installer里面修改多裝個sdk就行了

傳輸速度

PGM本身也有發送窗口的概念,如果使用默認設置,窗口小,發送速度非常慢,每秒最多只有70KB左右,這時候需要設置socket選項
RateKbitsPerSec 的單位是kilobits/s,是一個上限

    RM_SEND_WINDOW send_window;
    send_window.WindowSizeInBytes = 8000 * 1000;
    send_window.WindowSizeInMSecs = 1;
    send_window.RateKbitsPerSec = (send_window.WindowSizeInBytes/send_window.WindowSizeInMSecs)*8;


    int rc = setsockopt(s, IPPROTO_RM, RM_RATE_WINDOW_SIZE, (char *)&send_window, sizeof(send_window));
    if (rc == SOCKET_ERROR)
    {

        cout << "setsockopt(): RM_RATE_WINDOW_SIZE failed with error code " << WSAGetLastError() << endl;

    }

RM_SEND_WINDOW結構體就這么三個成員,第一個是每秒速度了,第二個是發送窗口的大小,第三個是窗口大小毫秒,其中windows會強制讓
RateKbitsPerSec/8 = WindowSizeInBytes * WindowSizeInMSecs

PS:WindowSzieInMSecs的值需要調整,當WindowSizeInBytes=8000並且WindowSzieInMSecs=1時,發送端較大概率會阻塞,原因未知,可能是發包速度過快導致

真的可靠么?

在文章一開始的時候可靠被加上了雙引號,為的是表明這個協議並不是想象中的那么可靠。

發送窗口大小有限,如果需要恢復重傳的數據在發送窗口之外了,那數據就是不可恢復的,一般當發送端速率過快接收端接收速度明顯跟不上時,就會出現不可恢復現象。一旦出現不可恢復數據時,windows就會讓接收端的連接重置,此時就不能繼續接收。

源碼

因為找不到對應的頭文件讓我着實頭疼了很久,相關文檔少,還不告訴我頭文件是什么,這太不爽了,就好比讓你看着門后面有啥,就是不給你鑰匙。

pgm分為server端和client端,功能是發送文件,根據msdn的文檔編寫的

下面是server端代碼

#define _WINSOCK_DEPRECATED_NO_WARNINGS
#define WIN32_LEAN_AND_MEAN

#include <iostream>
#include<winsock2.h>
#include<WS2tcpip.h>	//ip_mreqͷ
#include <wsrm.h>
#include <stdio.h>

using namespace std;
#pragma comment(lib,"ws2_32.lib")

int main() {
    WSADATA WSAData;
    WORD sockVersion = MAKEWORD(2, 2);
    if (WSAStartup(sockVersion, &WSAData) != 0)
        return 0;
    
    FILE *fp;
    fopen_s(&fp, "test.webm", "rb+");

    SOCKET        s;
    SOCKADDR_IN   salocal, sasession;
    int           dwSessionPort;

    s = socket(AF_INET, SOCK_RDM, IPPROTO_RM);

    salocal.sin_family = AF_INET;
    salocal.sin_port = htons(0);    // Port is ignored here
    salocal.sin_addr.s_addr = htonl(INADDR_ANY);

    bind(s, (SOCKADDR *)&salocal, sizeof(salocal));

    //
    // Set all relevant sender socket options here
    //

    //
    // Now, connect <entity type="hellip"/>
    // Setting the connection port (dwSessionPort) has relevance, and
    // can be used to multiplex multiple sessions to the same
    // multicast group address over different ports
    //
    dwSessionPort = 1234;
    sasession.sin_family = AF_INET;
    sasession.sin_port = htons(dwSessionPort);
    sasession.sin_addr.s_addr = inet_addr("224.4.5.6");

    RM_SEND_WINDOW send_window;
    send_window.WindowSizeInBytes = 8000;
    send_window.WindowSizeInMSecs = 1;
    send_window.RateKbitsPerSec = (send_window.WindowSizeInBytes/send_window.WindowSizeInMSecs)*8;

    int rc = setsockopt(s, IPPROTO_RM, RM_RATE_WINDOW_SIZE, (char *)&send_window, sizeof(send_window));
    if (rc == SOCKET_ERROR)
    {

        cout << "setsockopt(): RM_RATE_WINDOW_SIZE failed with error code " << WSAGetLastError() << endl;

    }
    connect(s, (SOCKADDR *)&sasession, sizeof(sasession));

    //
    // We're now ready to send data!
    //
    char pSendBuffer[1400];

    sockaddr_in serverAddr;
    int iAddrlen = sizeof(serverAddr);




    while (1) {
        if (feof(fp))
            break;
        memset(pSendBuffer, 0, 1400);

        int data_size = fread(pSendBuffer, 1, 1400, fp);
        
        LONG        error;

        error = sendto(s, pSendBuffer, data_size, 0, (sockaddr*)&serverAddr,iAddrlen);

        if (error == SOCKET_ERROR)
        {
            fprintf(stderr, "send() failed: Error = %d\n",
                WSAGetLastError());
        }
    }

    WSACleanup();
    return 0;
}

下面是client端代碼

#include <iostream>
#include<winsock2.h>
#include<WS2tcpip.h>	//ip_mreqͷ
#include <wsrm.h>
#include <stdio.h>

using namespace std;
#pragma comment(lib,"ws2_32.lib")

int main() {
    WSADATA WSAData;
    WORD sockVersion = MAKEWORD(2, 2);
    if (WSAStartup(sockVersion, &WSAData) != 0)
        return 0;


    SOCKET        s,
        sclient;
    SOCKADDR_IN   salocal,
        sasession;
    int           sasessionsz, dwSessionPort;

    FILE * fp;
    fopen_s(&fp, "aaatest.webm", "wb+");

    s = socket(AF_INET, SOCK_RDM, IPPROTO_RM);

    //
    // The bind port (dwSessionPort) specified should match that
    // which the sender specified in the connect call
    //
    dwSessionPort = 1234;
    salocal.sin_family = AF_INET;
    salocal.sin_port = htons(dwSessionPort);
    salocal.sin_addr.s_addr = inet_addr("224.4.5.6");
    int receive_buf_size = 65536 * 10;
    if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char*)&receive_buf_size, sizeof(receive_buf_size)) < 0)
    {
        std::cout << "setsockopt():SO_RCVBUF failed with error code" << WSAGetLastError() << std::endl;
    }


    bind(s, (SOCKADDR *)&salocal, sizeof(salocal));

    //
    // Set all relevant receiver socket options here
    //

    listen(s, 10);

    sasessionsz = sizeof(sasession);
    sclient = accept(s, (SOCKADDR *)&sasession, &sasessionsz);

    if (setsockopt(sclient, SOL_SOCKET, SO_RCVBUF, (char*)&receive_buf_size, sizeof(receive_buf_size)) < 0)
    {
        std::cout << "setsockopt():SO_RCVBUF failed with error code" << WSAGetLastError() << std::endl;
    }
    //
    // accept will return the client socket and we are now ready
    // to receive data on the new socket!
    //
    LONG BytesRead;
    char pTestBuffer[1400];

    sockaddr_in clientAddr;
    int iAddrlen = sizeof(clientAddr);

    while (1)
    {
        memset(pTestBuffer, 0, 1400);
        cout << "start" << endl;
        BytesRead = recvfrom(sclient, pTestBuffer, 1400, 0, (sockaddr*)&clientAddr, &iAddrlen);
        cout << "end" << endl;
        if (BytesRead == 0)
        {
            fprintf(stdout, "Session was terminated\n");
        }
        else if (BytesRead == -1)
        {
            std::cout << "no data?!" << std::endl;
        }
        if (BytesRead > 0)
        {
            fwrite(pTestBuffer, 1, BytesRead, fp);
            std::cout << BytesRead << std::endl;
        }
    }
    fclose(fp);
    WSACleanup();
    return 0;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM