實現TTCP (檢測TCP吞吐量)


實現TTCP (檢測TCP吞吐量)

應用層協議

為了解決TCP粘包問題以及客戶端阻塞問題
設計的應用層協議如下:

//告知要發送的數據包個數和長度
struct SessionMessage
{
  int32_t number;
  int32_t length;
} __attribute__ ((__packed__));

//數據包
struct PayloadMessage
{
  int32_t length;
  char data[0];//使用char[0]來表示不定長的數據,可以考慮用const char* 和 std::unique_ptr代替
};

為什么要設計應用層ACK?

因為我們測量的是應用層的流量,只有這樣才能保證測出的流量是有應用層收到的而不是傳輸層收到的,具體一點說,TCP 的 ACK 表示對方的協議棧已經收到了你發的數據,不代表對方的應用程序收到了你發的消息。

測試指標

帶寬 Mb/s
測試程序的性能指標: 傳輸帶寬,QPS/TPS, 以及 CPU利用率,延遲等等。

程序代碼

我們主要關注業務邏輯,客戶端和服務端的主要代碼如下

客戶端

void transmit(const Options& opt)
{
  InetAddress addr(opt.port);
  if (!InetAddress::resolve(opt.host.c_str(), &addr))
  {
    printf("Unable to resolve %s\n", opt.host.c_str());
    return;
  }

  printf("connecting to %s\n", addr.toIpPort().c_str());
  TcpStreamPtr stream(TcpStream::connect(addr));
  if (!stream)
  {
    printf("Unable to connect %s\n", addr.toIpPort().c_str());
    perror("");
    return;
  }

  if (opt.nodelay)
  {
    stream->setTcpNoDelay(true);
  }
  printf("connected\n");
  double start = now();
  struct SessionMessage sessionMessage = { 0, 0 };
  sessionMessage.number = htonl(opt.number);
  sessionMessage.length = htonl(opt.length);
  if (stream->sendAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
  {
    perror("write SessionMessage");
    return;
  }

  const int total_len = sizeof(int32_t) + opt.length;
  PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
  std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
  assert(payload);
  payload->length = htonl(opt.length);
  for (int i = 0; i < opt.length; ++i)
  {
    payload->data[i] = "0123456789ABCDEF"[i % 16];
  }

  double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024;
  printf("%.3f MiB in total\n", total_mb);

  for (int i = 0; i < opt.number; ++i)
  {
    int nw = stream->sendAll(payload, total_len);
    assert(nw == total_len);

    int ack = 0;
    int nr = stream->receiveAll(&ack, sizeof(ack));
    assert(nr == sizeof(ack));
    ack = ntohl(ack);
    assert(ack == opt.length);
  }

  double elapsed = now() - start;
  printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
}

服務端

void receive(const Options& opt)
{
  Acceptor acceptor(InetAddress(opt.port));
  TcpStreamPtr stream(acceptor.accept());
  if (!stream)
  {
    return;
  }
  struct SessionMessage sessionMessage = { 0, 0 };
  if (stream->receiveAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
  {
    perror("read SessionMessage");
    return;
  }

  sessionMessage.number = ntohl(sessionMessage.number);
  sessionMessage.length = ntohl(sessionMessage.length);
  printf("receive buffer length = %d\nreceive number of buffers = %d\n",
         sessionMessage.length, sessionMessage.number);
  double total_mb = 1.0 * sessionMessage.number * sessionMessage.length / 1024 / 1024;
  printf("%.3f MiB in total\n", total_mb);

  const int total_len = sizeof(int32_t) + sessionMessage.length;
  PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
  std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
  assert(payload);

  double start = now();
  for (int i = 0; i < sessionMessage.number; ++i)
  {
    payload->length = 0;
    if (stream->receiveAll(&payload->length, sizeof(payload->length)) != sizeof(payload->length))
    {
      perror("read length");
      return;
    }
    payload->length = ntohl(payload->length);
    assert(payload->length == sessionMessage.length);
    if (stream->receiveAll(payload->data, payload->length) != payload->length)
    {
      perror("read payload data");
      return;
    }
    int32_t ack = htonl(payload->length);
    if (stream->sendAll(&ack, sizeof(ack)) != sizeof(ack))
    {
      perror("write ack");
      return;
    }
  }
  double elapsed = now() - start;
  printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM