Lab3:the TCP Sender
# Lab3:the TCP Sender
lab3.pdf (cs144.github.io) (opens new window)
在这个 lab 中我们要实现 TCP 协议中的 Sender。
过程中参考了Stanford-CS144-Sponge 笔记 - Lab 3: The TCP Sender | 吃着土豆坐地铁的博客 (epis2048.net) (opens new window)
在发送端,我们通过维护接收窗口来实现 TCP 流量控制的机制,具体的可以看自顶向下书上的这张图。
具体的,我们维护了三个变量,send_base
、next_seqno
和 window_size
,其中 window_size
的值是通过接受来自接收方的 ACK 报文段所捎带获得的。
send_base
维护了窗口的左端点,也就是第一个发送但还未被确认的值(outstanding),next_seqno
是下一个将要发送的字节序号,显然我们此时的可发送数据范围是 [send_base, send_base + window_size)
,而剩余的空闲可发送区间是 [next_seqno, send_base + window_size)
。
实验中我们要实现函数 fill_window()
,我们每次调用该函数都会尽可能的发送数据,即把空闲区间尽量用完,我们将已发送且确认的数据和已发送未确认的数据分别用队列存储;需要注意的是,在 fill_window()
中如果此时窗口大小为 0 时,我们仍要发送一个字节的数据,防止 sender 停滞不进行。
对于已发送的数据如果超时未能收到确认,则我们还要对其进行重传操作。超时部分的计时,我们通过函数 tick()
来实现,每次超时后,我们还要将这个阈值翻倍,这样可以减少发送可能无法送到的数据。具体实现部分可以参考代码
而当数据发送出去后且收到确认时,sender 也会进行一系列的操作,它会更新 window_size
,重置计时器,更新相关的序号记录变量等,这部分逻辑在 ack_received()
中实现。
最后为了 TCP 连接的实现做准备,我们还要记录超时重传的次数以及实现一个发送空报文段的函数,前者在 TCP 连接中如果超过一定上限就会导致 RST 位被设置,并导致一些错误;后者则用来发送 ACK 确认。
具体实现代码如下:
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};
std::queue<TCPSegment> _outstanding{}; // 用来存储发送了但还未被确认的报文段
bool _syn = false;
bool _fin = false;
size_t _window_size = 0; // 当前接收窗口大小
size_t _consecutive_retransmissions = 0; // 连续重传次数
unsigned int _timer = 0; // 计时器
unsigned int _rto = 0; // 当前的 RTO
bool _timer_running = false; // 计时器运行状态
size_t send_base = 0; // 第一个发送还未被确认的序号
size_t _bytes_in_flight = 0;
// ..........
}
#include "tcp_sender.hh"
#include "tcp_config.hh"
#include <random>
// Dummy implementation of a TCP sender
// For Lab 3, please replace with a real implementation that passes the
// automated checks run by `make check_lab3`.
template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}
using namespace std;
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _rto(retx_timeout) {} // _rto 也一定要初始化
uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }
void TCPSender::fill_window() {
if (_fin) { // 已经结束了
return;
};
if (_syn == false) { // 如果这是第一个报文段(不存数据)
TCPSegment seg;
seg.header().syn = true; // 设置 syn 比特位
_syn = true;
send_segment(seg);
return;
}
size_t win = _window_size;
if (win == 0) win = 1; // 如果为 0,那么要继续发送一个字节的比特
// 没有数据了,且此时有空间
if (stream_in().eof() && send_base + win > _next_seqno) {
TCPSegment seg;
seg.header().fin = true;
_fin = true;
send_segment(seg);
return;
}
// 还有数据的话,且此时有空间
size_t remain = 0; // 计算剩余空间,即 next_seqno 到右端点(send_base + win)的距离
while ((remain = win - (_next_seqno - send_base)) > 0 && !stream_in().buffer_empty() && _fin == false) { // 具体计算的示意图可以看自顶向下书上的图
TCPSegment seg;
size_t size = std::min(TCPConfig::MAX_PAYLOAD_SIZE, remain); // 不能超过上限值
seg.payload() = _stream.read(std::min(size, _stream.buffer_size()));
// 设置 FIN 位
if (stream_in().eof() && seg.length_in_sequence_space() + 1 <= remain) { // fin 位也会占据一个序号
_fin = true;
seg.header().fin = true;
}
send_segment(seg);
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
size_t abs_ackno = unwrap(ackno, _isn, send_base);
if (abs_ackno > _next_seqno) return; // 超出了可接受范围
_window_size = window_size;
if (abs_ackno <= send_base) return; // 已接收过的 ack
send_base = abs_ackno;
while (!_outstanding.empty()) {
auto &seg = _outstanding.front();
// size_t first_seqn = unwrap(seg.header().seqno, _isn, send_base);
size_t first_seqn = unwrap(seg.header().seqno, _isn, send_base);
if (first_seqn + seg.length_in_sequence_space() <= send_base) { // 已经收到 ack 了
_bytes_in_flight -= seg.length_in_sequence_space();
_outstanding.pop();
} else {
break;
}
}
_rto = _initial_retransmission_timeout; // 重置 RTO
_consecutive_retransmissions = 0; // 重置连续重传次数
fill_window(); // 尽可能发送
if (_outstanding.size()) { // 如果还有别的已发送未确认报文段就重启计时器
_timer_running = true;
_timer = 0;
}
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_timer += ms_since_last_tick;
if (_outstanding.empty()) {_timer_running = false; // 关闭计时器
return;
}
if (_timer >= _rto) { // 超时
_segments_out.emplace(_outstanding.front()); // 重传最早的那个段
_timer = 0; // 重置计时器
_timer_running = true; // 开启计时器
if (_window_size > 0 || _outstanding.front().header().syn) { // 需要满足接收窗口非零
_rto *= 2;
_consecutive_retransmissions += 1;
}
}
}
unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions; }
void TCPSender::send_empty_segment() { // 无 payload, SYN, or FIN
TCPSegment seg;
seg.header().seqno = wrap(_next_seqno, _isn);
_segments_out.emplace(seg);
}
void TCPSender::send_segment(TCPSegment &seg) {
seg.header().seqno = wrap(_next_seqno, _isn);
_bytes_in_flight += seg.length_in_sequence_space();
_next_seqno += seg.length_in_sequence_space();
_outstanding.emplace(seg);
_segments_out.emplace(seg);
if (_timer_running == false) { // 开启计时器
_timer_running = true;
_timer = 0;
}
}