CS144 lab1 TCPReassembler學習筆記


 概述

    在實驗 0 中,您使用 Internet 流套接字從網站獲取信息並發送電子郵件,使用 Linux 的傳輸控制的內置實現協議 (TCP)。這個 TCP 實現設法產生了一對可靠的有序字節流(一個從你到服務器,一個在相反的方向),即使底層網絡只提供“盡力而為”的數據報。我們的意思是:可以丟失、重新排序、更改或復制的數據包。您還實施了自己的字節流抽象,在一台計算機的內存中。在接下來的四個星期里,您將實現 TCP,以提供一對計算機之間的字節流抽象由不可靠的數據報網絡分隔。
  我為什么要這樣做?在不同的基礎上提供服務或抽象不太可靠的服務是網絡中許多有趣問題的原因。結束在過去的 40 年里,研究人員和從業者已經找到了如何傳達各種事物——消息和電子郵件、超鏈接文檔、搜索引擎、聲音和視頻、虛擬世界、協作文件共享、數字貨幣——通過互聯網。TCP 本身的作用,使用不可靠的數據報提供一對可靠的字節流,是這方面的經典例子之一。一個合理的觀點認為 TCP 實現算作地球上使用最廣泛的非平凡計算機程序。
實驗室作業將要求您以模塊化方式構建 TCP 實現。
 

CS144:計算機網絡簡介

1. 在實驗 1 中,您將實現一個流重組器——一個拼接小塊的模塊字節流(稱為子串,或段)返回到連續流正確順序的字節數。
2. 在實驗 2 中,您將實現處理入站字節流的 TCP 部分:TCP接收器。這涉及考慮 TCP 將如何表示每個字節的位置在流中 - 稱為“序列號”。TCPReceiver 負責告訴發送者 (a) 它能夠組裝多少入站字節流成功(這稱為“確認”)和(b)發送方還有多少字節允許立即發送(“流量控制”)。
3. 在實驗 3 中,您將實現處理出站字節流的 TCP 部分:TCP發送器。當發送方懷疑它傳輸的一個段時,它應該如何反應一路上迷路了,從未到達接收器?什么時候再試一次並重新傳輸丟失的段?
4. 在實驗 4 中,您將把之前的工作與實驗結合起來,創建一個工作TCP 實現:一個包含 TCPSender 和 TCPReceiver 的 TCPConnection。您將使用它與世界各地的真實服務器對話。
 

將子串按順序排列

     在本實驗和下一個實驗中,您將實現一個 TCP 接收器:接收數據報並將它們轉換為可靠的字節流以供用戶讀取(就像您的webget 程序從實驗 0 中的 webserver 讀取字節流)。
TCP 發送方將其字節流分成短段(子串不超過每個大約 1,460 字節),以便它們每個都適合數據報。但是網絡可能重新排序這些數據報,或者丟棄它們,或者多次傳送它們。接收者必須將這些段重新組合成它們開始時的連續字節流。
     在本實驗中,您將編寫負責此重組的數據結構:流重組器。它將接收子串(由一串字節和索引組成較大流中字符串的第一個字節)並提供一個 ByteStream 與所有數據正確排序。
界面如下所示:
// 構造一個 `StreamReassembler`,最多可以存儲 `capacity` 個字節。
StreamReassembler( const size_t capcity);
// 接收一個子字符串並將任何新的連續字節寫入流中。
//
// `data`: 段
// `index` 表示 `data` 中第一個字節的索引(按順序放置)
// `eof`:該段的最后一個字節是整個流中的最后一個字節
void push_substring ( const string & data, const uint64_t index, const bool eof);
// 訪問重組的字節流
const ByteStream stream_out();
// 已存儲但尚未重組的子串中的字節數
size_t unassembled_bytes () const ;
// 內部狀態是否為空(輸出流除外)
bool empty() const ;

 

重組器的完整(公共)接口由 StreamReassembler 類描述在流 reassembler.hh 標頭中。你的任務是實現這個類。您可以添加您希望 StreamReassembler 類的任何私有成員和成員函數,但是你不能改變它的公共接口。
⋆我為什么要這樣做?
因為它是 TCP 對段的健壯性的核心 重新排序,它將使處理傳入的段變得更加容易。

常見問題

• 整個流中第一個字節的索引是多少?零。
• 我的實施應該有多高效?我們不會指定一個特定的還沒有效率的概念,但請不要將其視為構建粗獷空間的挑戰-或時間效率低下的數據結構——這種數據結構將成為你的基礎TCP 實現。
• 如何處理不一致的子串?你可能認為他們沒有存在。也就是說,您可以假設存在唯一的底層字節流,並且所有子字符串是它的(准確)切片。
• 我可以使用什么?您可以使用標准庫的任何您覺得有用的部分。特別是,我們希望您至少使用一種數據結構。
• 什么時候應該將字節寫入流?盡快。唯一的情況在哪個字節不應該在流中是當它之前有一個字節時還沒有被“推”過。
• 子串可以重疊嗎?是的。
• 我是否需要向StreamReassembler 添加私有成員?是的。由於字串可能以任何順序到達,您的數據結構將必須“記住”子字符串,直到它們准備好放入流中——也就是說,直到它們之前的所有索引都被填滿。

實驗結果

實驗總結

   這個實驗主要是利用lab0寫的讀寫字節流,完成一個字節重組器,對不按序到達的字串重組,排序好后讀入字節流中,測試程序會自動讀取字節流中的數據.

 

 

根據實驗指示圖,我們需要寫一個數據結構,維持容量為capcity,綠色部分代表已經讀入字節流但是還沒讀出的部分,紅色部分代表還沒有重組的部分,藍色部分表示已經從字節流中讀出的部分。

我們接受到data后,將不能重組的部分存儲起來,如果能重組則直接重組,然后直接讀入字節流。

不用擔心讀入失敗問題,因為初始化時設置字節流的最大容量capcity和我們的數據結構維持的width是一樣的,即綠色的部分永遠小於width,所以只要能放進width的部分就可以直接讀入字節流.

接受到data時,可能有超界的情況,比如:

 

 此時我們需要去掉兩端多余的部分,保證data在不越界。

藍色部分已經從字節流中讀出,綠色部分也讀入了字節流,我們實際需要儲存的就只有紅色部分,即未重組的子字符串

而每次push子字符串的時候需要重組,重組就要找到該子字符串附近能夠重組的子字符串進行重組,於是想到了用set的upper_bound去快速查找

我們用set,自定義一個子字符串結構體node,然后利用其數據的start_index作為排序依據,然后每次push子字符串s1時,

先進行重組操作

用upper_bound找到start_index比它小(迭代器--)的另一個子字符串s2,然后向后一個一個查詢,如果與s1可以重組,則將在s2的多出部分添加到s1,並從set中刪除s2,最后將s2添加到set中

再進行讀入判斷

set是按照startindex排序的,所以我們只需要判斷第一個節點的start_index是否為目前下一個需要讀入到字節流的index,即圖中的first_unressmbled,如果是,則表示可以讀入,將第一個節點數據讀入字節流,然后從set中刪除該節點即可

eof判斷

創建一個eof_index標記,當傳入參數eof為true時,我們設置eof_index為data.size()+index,然后再每次數據讀入字節流的時候判斷是否讀入數據已經到達eof_index,到達了就調用字節流的end_input函數結束讀入。

stream_reassembler.hh

#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

#include "byte_stream.hh"

#include <cstdint>
#include <string>
#include <set>

//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.
    struct node{
      size_t start_idx=0;
      size_t end_idx=0;
      std::string data="";
      node(size_t index,const std::string &DATA){
        this->start_idx=index;
        this->data=DATA;
        this->end_idx=index+DATA.size()-1;
      }
      bool operator<(const node &b)const{
        if(start_idx!=b.start_idx)return start_idx<b.start_idx;
        else return end_idx<b.end_idx;
      }
    };
    ByteStream _outputStream;  //!< The reassembled in-order byte stream
    size_t _capacity=0;    //!< The maximum number of bytes
    std::set<node>_seg_buffer={};
    size_t _input_end_idx=UINT64_MAX;
    size_t _cur_idx=0;
    size_t _unassembled_bytes=0;
  private:
    void mergeTo(const node&a,node &b);
  public:
    //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
    //! \note This capacity limits both the bytes that have been reassembled,
    //! and those that have not yet been reassembled.
    StreamReassembler(const size_t capacity);

    //! \brief Receive a substring and write any newly contiguous bytes into the stream.
    //!
    //! The StreamReassembler will stay within the memory limits of the `capacity`.
    //! Bytes that would exceed the capacity are silently discarded.
    //!
    //! \param data the substring
    //! \param index indicates the index (place in sequence) of the first byte in `data`
    //! \param eof the last byte of `data` will be the last byte in the entire stream
    void push_substring(const std::string &data, const uint64_t index, const bool eof);

    //! \name Access the reassembled byte stream
    //!@{
    const ByteStream &stream_out() const { return _outputStream; }
    ByteStream &stream_out() { return _outputStream; }
    //!@}

    //! The number of bytes in the substrings stored but not yet reassembled
    //!
    //! \note If the byte at a particular index has been pushed more than once, it
    //! should only be counted once for the purpose of this function.
    size_t unassembled_bytes() const;

    //! \brief Is the internal state empty (other than the output stream)?
    //! \returns `true` if no substrings are waiting to be assembled
    bool empty() const;
};

#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

 

stream_reassembler.cc

#include "stream_reassembler.hh"
#include<iostream>

// Dummy implementation of a stream reassembler.

// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.

// You will need to add private members to the class declaration in `stream_reassembler.hh`


using namespace std;

StreamReassembler::StreamReassembler(const size_t capacity) : _outputStream(capacity), _capacity(capacity) {}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::mergeTo(const node &a,node &b){
    if(a.start_idx<b.start_idx){
      b.data.insert(b.data.begin(),a.data.begin(),a.data.begin()+b.start_idx-a.start_idx);
      b.start_idx=a.start_idx;
    }
    if(a.end_idx>b.end_idx){
      b.data.insert(b.data.end(),a.data.end()-(a.end_idx-b.end_idx),a.data.end());
      b.end_idx=a.end_idx;
    }
}
//先重組,再提交到緩沖區
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    
    size_t max_idx=_cur_idx-_outputStream.buffer_size()+_capacity-1;
    if(eof){_input_end_idx=index+data.size();}
    node nd(index,data);

    //邊界情況
    //如果已進入緩沖區的seg包含了該seg,或者該seg的起始idx都大於窗口最大值,或者seg為空直接丟棄
    if(_cur_idx>nd.end_idx||nd.start_idx>max_idx||data.empty()){
        //可能是eof延遲標記   
        if(_cur_idx>=_input_end_idx)_outputStream.end_input();
         return ;
    }
    //去除左右越界數據
    if(nd.start_idx<_cur_idx){
        nd.data=data.substr(_cur_idx-nd.start_idx);
        nd.start_idx=_cur_idx;
    }
    if(nd.end_idx>max_idx){
        nd.data=nd.data.substr(0,nd.data.size()-(nd.end_idx-max_idx));
        nd.end_idx=max_idx;
    }
    //處理完邊界情況,當前seg的data都在nextIdx和maxIdx之間,把可以重合的seg重組
    if(!_seg_buffer.empty()){
       set<node>::iterator it=_seg_buffer.upper_bound(nd);
       if(it!=_seg_buffer.begin()){
           it--;
       }
       while(it!=_seg_buffer.end()){
        //如果有交集,則刪除迭代器,重合部分添加在nd中,別問我為什么判斷要寫這么丑,直接放里面不行嘛,嗯,你可以試試
        int a=it->end_idx-(nd.start_idx-1),b=(nd.end_idx+1)-it->start_idx;
           if(a>=0&&b>=0){
              mergeTo(*it,nd);
              _unassembled_bytes-=it->data.size();
              it=_seg_buffer.erase(it);
           }
           //沒有交集,但是seg已經在右側,再往右找也找不到能產生交集的seg,退出
           else if(it->start_idx>nd.end_idx){
               break;
           }else{
               it++;
           }
       }
    }
    //提交到緩沖區,找seg最前的部分,判斷是否能讀入字節流
    if(nd.start_idx==_cur_idx){
        _outputStream.write(nd.data);
        _cur_idx=nd.end_idx+1;
        if(_cur_idx>=_input_end_idx)_outputStream.end_input();
    }else{
    _seg_buffer.insert(nd);
    _unassembled_bytes+=nd.data.size();
    }
}

size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }

bool StreamReassembler::empty() const { return _unassembled_bytes==0; }

 

 

 

 

 

 

 

 

 

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM