CS144學習(2)TCP協議實現


Lab1-4 分別是完成一個流重組器,TCP接收端,TCP發送端,TCP連接四個部分,將四個部分組合在一起就是一個完整的TCP端了。之后經過包裝就可以進行TCP的接收和發送了。

代碼全部在github上了。

Lab1 流重組器

這一個實驗是要實現一個流重組器,傳入數據的片段以及起始位置,之后對其進行重組,並盡快將以及重組完成的數據輸出。

這里我使用的是紅黑樹來實現,也就是C++的std::set來實現。將未重組完成的碎片保存在紅黑樹中,當新碎片到達時就盡可能地將該碎片與已有的碎片進行合並,保證紅黑樹中沒有重疊的碎片。

這一個實驗的問題就是要考慮的情況有很多,當用lower_bound()找到插入位置后,要對前面和后面的碎片判斷能否合並,合並的情況也有很多種,包括部分重疊、正好接上、完全覆蓋等情況;而且一個新的碎片可能會一次覆蓋掉很多碎片。這一部分代碼我寫的比較混亂,因為寫完之后測試發現有情況沒考慮到,然后就只能打補丁,於是就越來越混亂了。

而盡快輸出這個條件還是很容易的,如果當前到達碎片能夠直接輸出的話,就再判斷一下樹中第一個碎片能否輸出,因為前面保證了不會有重疊碎片,所以可以只對第一個碎片進行判斷。

Lab2 TCP接收端

這個實驗是基於上一個的流重組器來實現一個TCP接收端,這一個還是比較簡單的。就是前面流重組器的一些BUG可能會在這個實驗里面被檢測到,要回去改代碼。

首先是實現一個WrappingInt32,因為TCP的序號是32位的,並且是可能發生溢出的,而在流重組器里面使用的序列號是64位,因此需要實現函數來進行轉換,將64位的相對序列號根據ISN轉換成32位的絕對序列號。

#include "wrapping_integers.hh"

using namespace std;

WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
    uint64_t res = isn.raw_value() + n;
    return WrappingInt32{static_cast<uint32_t>(res)};
}

uint64_t abs(uint64_t a, uint64_t b) {
    if (a > b) {
        return a - b;
    } else {
        return b - a;
    }
}

uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
    uint64_t pre = checkpoint & 0xffffffff00000000;
    uint64_t num;
    if (n.raw_value() >= isn.raw_value()) {
        num = n.raw_value() - isn.raw_value();
    } else {
        num = 0x0000000100000000;
        num += n.raw_value();
        num -= isn.raw_value();
    }
    uint64_t a = pre + num;
    uint64_t b = a + 0x0000000100000000;
    uint64_t c = a - 0x0000000100000000;
    // b a c
    if (abs(a, checkpoint) < abs(b, checkpoint)) {
        if (abs(a, checkpoint) < abs(c, checkpoint)) {
            return a;
        } else {
            return c;
        }
    } else {
        return b;
    }
}

最后就是對流重組器進行一下包裝,計算出acknowindow_size提供給后面使用,處理一下SYN和FIN標記就行了。

#include "tcp_receiver.hh"

using namespace std;

void TCPReceiver::segment_received(const TCPSegment &seg) {
    if (!_isn.has_value()) {
        if (seg.header().syn) {
            _isn = seg.header().seqno + 1;
        } else {
            std::cerr << "Error: connection not build" << std::endl;
            return;
        }
    }

    bool eof = seg.header().fin;
    std::string&& payload = std::string(seg.payload().str());
    if (seg.header().seqno == _isn.value() - 1 && !seg.header().syn && _reassembler.expect() < 0x0000ffff) {
        // wrong packet seqno == isn
        return;
    }
    uint64_t index = unwrap(seg.header().seqno + (seg.header().syn ? 1 : 0), _isn.value(), _reassembler.expect());
    _reassembler.push_substring(payload, index, eof);
}

optional<WrappingInt32> TCPReceiver::ackno() const {
    if (_isn.has_value()) {
        return { wrap(_reassembler.expect(), _isn.value()) + (_reassembler.stream_out().input_ended() ? 1 : 0) };
    } else {
        return std::nullopt;
    }
}

size_t TCPReceiver::window_size() const {
    return _capacity - _reassembler.stream_out().buffer_size();
}

Lab3 TCP發送端

在這個實驗里面就要考慮到TCP的一些細節了,包括SYNFIN包的發送,ACK的處理,超時重傳的實現了。

SYN包

之前我還在考慮客戶端和服務端的SYN包應該是不同的,應該如何處理;而實際上兩個包是相同的,都是攜帶SYN和初始序列號,不同的地方就是服務端的SYN要同時對客戶端的SYN的包進行ACK。但ACK的處理是在TCP連接部分進行處理的,也就是說在TCP發送端里,只要發出一個SYN包就行了,剩下的不需要考慮。

而應該在什么時候發出SYN包呢,一開始我是在構造函數中就構造一個SYN包放到發送隊列里面。這個做法在這個實驗的測試里面是沒有問題的,但是對於下一個實驗就有問題了。因為服務端一開始是處於LISTEN狀態,而這個狀態下不應該有包被發出。因此,SYN包的發送應該放在fill_window()函數中,如果沒有發送過SYN包,就先將SYN包發送出去。

對於一個最簡單的SYN包,就只需要將SYN位置1,設置初始序列號seqno就行了。注意SYN是要占用一個序列號的。

FIN包

當發送流被用戶程序結束后,就可以發送FIN包來關閉一個方向的連接了。這一個包的發送還是比較簡單的,只要在fill_window中判斷是否結束就行了。FIN包是可以和數據包一起發送的,在發送數據時發現流結束了,就將該數據包的FIN標志置1就行了。FIN也是要占用一個序列號的。當收到對方對FIN包的ACK后,就說明順利關閉了。

ACK的處理

ACK數據會通過ack_received函數來通知發送端,當收到一個ACK后,就可以將發送窗口向右滑動,注意判斷一下ACK是不是之前的ACK,避免窗口左移。最后將所有被ACK了的包從等待確認的隊列中移除就行了。

重傳

在這個實驗里面只要求實現的是超時重傳機制,但我也加入了快速重傳機制。理論上當一個包超時之后就要對其進行重傳,也就是每個包都要有定時器來負責重傳,而這樣的代價是很高的。因此,實現中是對每個TCP連接設置一個定時器,當時間超過RTO后進行重傳,定時器的規則如下:

  • 當發送一個包並且定時器為關閉狀態:打開定時器
  • 當發送一個包並且定時器為打開狀態:不做任何修改
  • 當收到一個ACK並且所有包都被ACK了:關閉定時器
  • 當收到一個ACK並且仍有包未被ACK:重開定時器

超時重傳使用的是指數退避算法,當進行了一次超時重傳后,下一次超時的時間就會翻倍,也就是1RTO,2RTO,4RTO,8RTO…

當定時器超時后,就需要對包進行重傳,在本實驗里面只要重傳第一個包就行了。這種方法的缺點就是后面丟失的包也要等到第一個被ACK了才能重發,時間會比較長。另一種選擇是重傳所有包,而這種的缺點就是會加大網絡負擔。因此,為了解決這種問題,就引入了其他的機制。

快速重傳

快速重傳是指當收到三個重復ACK(不包括第一次ACK)的時候,就立即進行重傳,這種方法是數據驅動而不是時間驅動,可以避免超時重傳的速度較慢的問題。而這仍然存在是重傳一個還是所有的問題。

SACK

SACK也就是選擇重傳機制,接收端通過SACK來確認已收到的片段,從而對重傳算法進行優化,可以不用對所有包進行重傳。

而SACK存在接收方Reneging的問題,即接收方有權把已經SACK的數據給丟棄。這種丟棄是不被鼓勵但還是可能發生的。因此,發送方不能完全依賴SACK,還是要依賴ACK,並維護定時器,如果后續的ACK沒有增長,那么還是要對已經SACK的數據進行重傳。同時,接收端也永遠不能把SACK的包標記為ACK。

#include "tcp_sender.hh"
#include "tcp_config.hh"

#include <random>
#include <cassert>

using namespace std;

TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity) { }

uint64_t TCPSender::bytes_in_flight() const {
    return _next_seqno - _expect_ack;
}

void TCPSender::fill_window() {
    if (!_syn_sent) {
        TCPSegment seg;
        seg.header().syn = true;
        seg.header().seqno = wrap(0, _isn);
        _segments_out.push(seg);
        _seg_not_ack.push(seg);
        _next_seqno = 1;
        _retrans_timer = _tick + _initial_retransmission_timeout;
        _syn_sent = true;
    }
    uint64_t remain = _window_size - bytes_in_flight();
    bool send = false;
    if (_expect_ack != 0) {
        // SYN received
        while (remain > 0 && _stream.buffer_size() > 0) {
            // send segment
            uint64_t send_bytes = min(remain, TCPConfig::MAX_PAYLOAD_SIZE);
            string payload = _stream.read(send_bytes);
            TCPSegment seg;
            seg.header().seqno = wrap(_next_seqno, _isn);
            seg.payload() = move(payload);
            _next_seqno += seg.length_in_sequence_space();
            remain = _window_size - bytes_in_flight();
            if (_stream.eof() && remain > 0 && !_fin_sent) {
                seg.header().fin = true;
                _next_seqno += 1;
                _fin_sent = true;
            }
            _segments_out.push(seg);
            _seg_not_ack.push(seg);
            send = true;
        }
    }
    if (_stream.eof() && remain > 0 && !_fin_sent) {
        // send FIN
        TCPSegment seg;
        seg.header().fin = true;
        seg.header().seqno = wrap(_next_seqno, _isn);
        _segments_out.push(seg);
        _seg_not_ack.push(seg);
        _next_seqno += 1;
        _fin_sent = true;
        send = true;
    }

    if (send && _retrans_timer == 0) {
        // open timer
        _retrans_timer = _tick + _initial_retransmission_timeout;
        _consecutive_retransmissions = 0;
        _rto_back_off = 0;
    }
}

void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    _window_size = window_size;
    _do_back_off = 1;
    if (_window_size == 0) {
        _window_size = 1;
        _do_back_off = 0;
    }
    uint64_t ack = unwrap(ackno, _isn, _expect_ack);
    if (ack <= _next_seqno && ack > _expect_ack) {
        if (ack == _expect_ack) {
            _same_ack++;
        } else {
            _same_ack = 0;
        }
        _expect_ack = ack;
        if (bytes_in_flight() == 0) {
            // close timer
            _retrans_timer = 0;
            _consecutive_retransmissions = 0;
            _rto_back_off = 0;
        } else {
            // reopen timer
            _retrans_timer = _tick + _initial_retransmission_timeout;
            _consecutive_retransmissions = 0;
            _rto_back_off = 0;
        }
    }

    // remove all acked packets
    while (!_seg_not_ack.empty()) {
        TCPSegment seg = _seg_not_ack.front();
        if (seg.length_in_sequence_space() + unwrap(seg.header().seqno, _isn, _expect_ack) <= _expect_ack) {
            _seg_not_ack.pop();
        } else {
            break;
        }
    }

    // faster retransmit
    if (_same_ack == 3 && !_seg_not_ack.empty()) {
        // cout << "!! FASTER RETRANSMIT" << endl;
        _same_ack = 0;
        TCPSegment seg = _seg_not_ack.front();
        _segments_out.push(seg);
        _consecutive_retransmissions += 1;
        _rto_back_off += _do_back_off;
        _retrans_timer += _initial_retransmission_timeout << _rto_back_off;
    }
}

void TCPSender::tick(const size_t ms_since_last_tick) {
    _tick += ms_since_last_tick;
    
    if (!_seg_not_ack.empty() && _tick >= _retrans_timer) {
        // retransmit the first packet
        // cout << "retransmit" << endl;
        TCPSegment seg = _seg_not_ack.front();
        _segments_out.push(seg);
        _consecutive_retransmissions += 1;
        _rto_back_off += _do_back_off;
        _retrans_timer = _tick + (_initial_retransmission_timeout << _rto_back_off);
    }
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions; }

void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = wrap(_next_seqno, _isn);
    _segments_out.push(seg);
}

Lab4 TCP連接

這個實驗就是將之前的發送端和接收端組合起來,成為一個完整的TCP peer。

主要的工作就是將發送的數據從發送端的隊列中取出,再放到發送隊列中去;發送ack包進行確認;對RST進行處理;對連接的關閉和TIME_WAIT狀態進行處理。

當調用connect函數時,就可以調用fill_window生成SYN包,然后發送出去。

當收到一個包之后,就將對於信息交給發送端和接收端進行處理,然后進行ACK,當發送隊列有包時直接附帶ACK就行了,如果沒有就要生成一個空包進行ACK,注意當接收的包只是一個ACK包而沒有任何數據的話就不要進行ACK。當接收端收到所有數據以及FIN包之后就會關閉接收端的輸入流。

當連接的輸入流關閉,就可以調用發送端的end_inputfill_window來生成FIN包並發送。

連接的關閉

TCP連接的關閉分為兩種情況,主動關閉和被動關閉。

當發送流先結束時,就要進行主動關閉,發送FIN進入FIN_WAIT_1狀態,收到ACK后進入FIN_WAIT_2狀態,當收到對方的FIN包並進行ACK之后,就進入TIME_WAIT狀態,在TIME_WAIT狀態下要等待2MSL(Linux中一般為60s)才能釋放連接。

TIME_WAIT狀態的目的就是保證最后一個ACK包被對方接收到,因為不會對ACK進行ACK,就只能使用這種方式。如果ACK沒有被對方接收到,那么對方就會重發FIN包,這時候就可以再次進行ACK。如果直接釋放而不進行TIME_WAIT的話,那么下一個使用該端口的連接就可能會收到上一個連接重傳的FIN包,從而導致混亂。

當對方先關閉時就是被動關閉,當收到FIN並ACK后,就進入CLOSE_WAIT狀態。等到發送流結束后,發送FIN進入LAST-ACK狀態,收到對方ACK后就可以關閉連接了,當被動關閉時,就不需要TIME_WAIT了。

#include "tcp_connection.hh"

#include <iostream>

using namespace std;

void TCPConnection::send_all_segments() {
    if (_closed) return;
    while (!_sender.segments_out().empty()) {
        TCPSegment& seg = _sender.segments_out().front();
        if (_receiver.ackno().has_value()) {
            seg.header().ack = true;
            seg.header().ackno = _receiver.ackno().value();
        }
        size_t max_win = numeric_limits<uint16_t>().max();
        seg.header().win = min(_receiver.window_size(), max_win);
        _segments_out.push(seg);
        _sender.segments_out().pop();
    }

    if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) {
        if (_linger_after_streams_finish) {
            _time_wait = true;
        }
    }
}

size_t TCPConnection::remaining_outbound_capacity() const {
    return _sender.stream_in().remaining_capacity();
}

size_t TCPConnection::bytes_in_flight() const {
    return _sender.bytes_in_flight();
}

size_t TCPConnection::unassembled_bytes() const {
    return _receiver.unassembled_bytes();
}

size_t TCPConnection::time_since_last_segment_received() const {
    return _ticks - _last_ack_time;
}

void TCPConnection::segment_received(const TCPSegment &seg) {
    if (!_syn_sent && !seg.header().syn) return;
    if (seg.header().rst) {
        // reset connection
        _sender.stream_in().set_error();
        _receiver.stream_out().set_error();
        _linger_after_streams_finish = false;
    }

    _last_ack_time = _ticks;
    _receiver.segment_received(seg);
    _sender.ack_received(seg.header().ackno, seg.header().win);
    _sender.fill_window();
    _syn_sent = true;

    if (_receiver.stream_out().input_ended() && !_sender.stream_in().eof()) {
        // passive close
        _linger_after_streams_finish = false;
    }

    if (!_receiver.ackno().has_value()) {
        return; // no need for ack
    }
    if (_sender.segments_out().empty()) {
        // generate an empty segment to ack
        if (_receiver.stream_out().input_ended() && !seg.header().fin) {
            // no need to ack, server closed and seg not fin
        } else if (seg.length_in_sequence_space() == 0) {
            // no need to ack the empty-ack
        } else {
            _sender.send_empty_segment();
        }
    }
    // send with ack
    send_all_segments();
}

bool TCPConnection::active() const {
    if (_sender.stream_in().error() && _receiver.stream_out().error()) return false;
    return !(_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) || _time_wait;
}

size_t TCPConnection::write(const string &data) {
    size_t wrote = _sender.stream_in().write(data);
    _sender.fill_window();
    send_all_segments();
    return wrote;
}

void TCPConnection::tick(const size_t ms_since_last_tick) {
    _ticks += ms_since_last_tick;
    _sender.tick(ms_since_last_tick);

    if (_time_wait && _ticks >= _last_ack_time + _cfg.rt_timeout * 10) {
        // closed
        _time_wait = false;
        _closed = true;
    }

    if (_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS) {
        // RST
        _sender.stream_in().set_error();
        _receiver.stream_out().set_error();
        _linger_after_streams_finish = false;
        while (!_sender.segments_out().empty()) {
            // pop all segments
            _sender.segments_out().pop();
        }
        _sender.send_empty_segment();
        TCPSegment& seg = _sender.segments_out().front();
        seg.header().rst = true;
    }
    send_all_segments();
}

void TCPConnection::end_input_stream() {
    _sender.stream_in().end_input();
    _sender.fill_window();
    send_all_segments();
}

void TCPConnection::connect() {
    // send SYN
    if (!_syn_sent) {
        _sender.fill_window();
        _syn_sent = true;
        TCPSegment& seg = _sender.segments_out().front();
        size_t max_win = numeric_limits<uint16_t>().max();
        seg.header().win = min(_receiver.window_size(), max_win);
        _segments_out.push(seg);
        _sender.segments_out().pop();
    }
}

TCPConnection::~TCPConnection() {
    try {
        if (active()) {
            cerr << "Warning: Unclean shutdown of TCPConnection\n";
            _sender.stream_in().set_error();
            _receiver.stream_out().set_error();
            _linger_after_streams_finish = false;
            while (!_sender.segments_out().empty()) {
                // pop all segments
                _sender.segments_out().pop();
            }
            _sender.send_empty_segment();
            TCPSegment& seg = _sender.segments_out().front();
            seg.header().rst = true;
            send_all_segments();
        }
    } catch (const exception &e) {
        std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
    }
}

測試

至此整個簡單的TCP協議就實現完了,使用這個TCP協議修改Lab0中的webget,然后就可以對網站進行訪問了。使用抓包軟件就可以看到完整的連接建立、數據發送、連接關閉的過程了。但這里有一個問題不知道是為什么,使用webget訪問cs144.keithw.orgbilibili.com都能正常訪問,但是訪問www.baidu.com的時候,就會丟失連接建立之后的一個包,抓包看根本沒收到那個包(tcp previous segment not captured),不知道是我的代碼有問題還是百度的服務器使用了什么特殊的策略。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM