0%

Stanford-CS144-Sponge 笔记 - Lab 3: The TCP Sender

第三个实验要求实现一个TCP发送器。

1. OverView

现在,在Lab 3中,您将实现连接的另一端。TCPSender是一种工具,它将输出的字节流转换为将成为不可靠数据报的有效负载的段。最后,在实验4中,您将结合前面的实验来创建一个工作的TCP实现:一个包含TCPSender和TCPReceiver的TCPConnection。您将使用它与Internet上的真实服务器通信。
https://files.epis2048.net/uploads/2022/图像7.png

2. The TCP Sender

TCP是一种协议,它通过不可靠的数据报可靠地传递一对流控制的字节流(每个方向一个)。两方参与到TCP连接中,每一方同时充当“发送方”(自己发送的字节流)和“接收方”(传入的字节流)。这两方称为连接的“端点”,或“对等点”。

本周,您将实现TCP的“发送方”部分,负责从ByteStream(由一些发送方应用程序创建并写入)读取数据,并将流转换为输出TCP段的序列。在远程端,TCP接收方将这些段(那些到达的段—它们可能不会全部到达)转换回原始的字节流,并向发送方发送确认和窗口广告。

TCP发送方和接收方各自负责TCP段的一部分。TCP发送方写入Lab 2中与TCPReceiver相关的TCPSegment的所有字段:序列号、SYN标志、有效载荷和FIN标志。然而,TCP发送方只读取接收方写入的段中的字段:ackno和窗口大小。下面是一个TCP段的结构,只突出显示了将被发送方读取的字段:

https://files.epis2048.net/uploads/2022/图像9.png

TCPSender需要干的:

  • 跟踪接收者的窗口(处理传入的acknos和window sizes)。
  • 如果可能,通过从ByteStream读取,创建新的TCP段(如果需要的话,包括SYN和FIN标志),并发送它们来填充窗口。发送方应该持续发送TCP段,直到窗口满或者ByteStream为空。
  • 跟踪哪些片段已经发送但还没有被接收方确认——我们称这些为“未完成的”片段。
  • 如果已经发送了足够长的时间,但还没有被确认,则重新发送。

2.1 How does the TCPSender know if a segment was lost?

你的TCPSender将发送一堆TCP段(TCPSegments)。每个都将包含一个来自输出ByteStream的(可能为空)子字符串,使用sequence number来指示其在流中的位置,并在流的开始处标记SYN标志,在流的结束处标记FIN标志。

除了发送这些段外,TCPSender还必须跟踪其未完成的段,直到它们占用的序列号完全被接收方确认接收。TCPSender的所有者将周期性地调用TCPSender的tick方法,来表示时间的流逝。TCPSender负责跟踪接收方所有未处理的TCPSegments集合,并判断最早发送的段是否在没有确认的情况下(也就是说,在没有确认其所有序列号的情况下)长时间未处理。如果是,则需要重新传输(再次发送)。

以下是“超时”的规则。我们会给你一些合理的单元测试在本周,和更全面的集成测试在实验室4一旦你完成整个TCP实现。只要100%通过这些测试,并且实现合理,就没问题。

  1. 每隔几毫秒,TCPSender的tick方法将被一个参数调用,该参数告诉TCPSender从上次调用该方法到现在已经经过了多少毫秒。使用此参数来维护TCPSender已存在的总毫秒数的概念。请不要尝试从操作系统或cpu调用任何“时间”或“时钟”函数——tick方法是您访问时间流逝的唯一方法。这保持了事物的确定性和可测试性。
  2. 在构造TCPSender时,给出一个参数,告诉它重传超时(RTO)的“初始值”。RTO是指在重新发送一个未完成的TCP段之前等待的毫秒数。RTO的值会随时间变化,但“初始值”保持不变。启动器代码将RTO的“初始值”保存在一个名为initial_retransmission_timeout_的成员变量中。
  3. 您将实现重传计时器:一个可以在特定时间启动的闹钟,一旦RTO超时,闹钟就会响(或“过期”)。我们强调,这种时间流逝的概念来自于被调用的tick方法——而不是通过获取一天中的实际时间。
  4. 每次发送包含数据(序列空间中非零长度)的段时(无论是第一次还是重传),如果计时器没有运行,则启动它,使其在RTO毫秒(RTO的当前值)之后过期。这里的“过期”是指时间将在未来用完一定的毫秒数。
  5. 当所有未完成的数据被确认后,停止重传计时器。
  6. 如果tick被调用并且重传计时器已经过期:
    a) 重传TCP接收端尚未完全确认的最早(最低序列号)的段。您需要将未被确认接收的段存储在一些内部数据结构中,这样才能实现这一点。
    b) 如果窗口大小非零:i. 记录连续重传的数量,每次重传都要增加它,因为你刚刚重传了一些东西。您的TCPConnection将使用此信息来决定连接是否失败(连续重传太多),需要中止。ii. RTO值翻倍。这被称为“指数后退”——它降低了糟糕网络的重传速度,以避免进一步破坏。
    c) 重置重传计时器并启动它,使其在RTO毫秒后过期(注意,你可能已经将RTO的值翻倍了!)
  7. 当接收方向发送方发送一个确认新数据成功接收的ackno(这个ackno反映了一个比之前任何ackno都大的absolute sequence):
    a) 将RTO设置回它的“初始值”。
    b) 如果发送方有任何未完成的数据,重新启动重传定时器,使其在RTO毫秒(RTO的当前值)之后过期。
    c) 将“连续重传”的计数重置为零。

2.2 Implementing the TCP sender

我们已经讨论了TCP发送方的基本思路(给定一个传出的字节流,将它分成几个部分,发送给接收方,如果它们在一定时间内没有被接收方确认收到,则继续重新发送它们)。我们已经讨论了什么时候得出一个传出的段丢失了的结论,需要重新发送。

现在是时候介绍您的TCPSender将提供的具体接口了。它需要处理四个重要事件,每个事件都可能发送一个TCPSegment:

  1. void fill_window()
    TCPSender被要求填充窗口:它从输入的ByteStream中读取并以TCPSegments的形式发送尽可能多的字节,只要窗口中有新的字节需要读取并且有可用的空间。
    您将需要确保您发送的每个TCPSegment完全符合接收者的窗口。使每个单独的TCPSegment尽可能大,但不大于TCPConfig::MAX PAYLOAD SIZE(1452字节)给出的值。
    您可以使用TCPSegment::length_in_sequence_space()方法来统计一个段所占用的序列号的总数。请记住,SYN和FIN标志也各自占用一个序列号,这意味着它们占用窗口中的空间。
  2. void ack_received(const WrappingInt32 ackno, const uint16_t window_size)
    从接收端接收到一个段,包含传递窗口的新左边(= ackno)和右边(= ackno + 窗口大小)。TCPSender应该检查它的未被确认接收的段的集合,并删除任何现在已经完全确认的段(确认号大于段中的所有序列号)。如果有新的空间打开,则TCPSender应再次填充窗口。
  3. void tick( const size t ms since last tick )
    时间已经过去了—自上次调用该方法以来已经经过了一定的毫秒数。发送方可能需要重新发送一个未完成的段。
  4. void send empty segment()
    TCPSender应该生成并发送一个在序列空间中为零长度的TCPSegment,并且正确设置序列号。如果所有者(您将在下周实现的TCPConnection)想要发送一个空的ACK段,这是非常有用的。
    注意:像这样的段,它不占用序列号,不需要被跟踪为“未完成”,也永远不会被重传。

2.3 Theory of testing

为了测试您的代码,测试套件将期望它在一系列情况下演进——从发送第一个SYN段,到发送所有数据,再到发送FIN段,最后确认FIN段。我们不认为您想要创建更多的状态变量来跟踪这些“状态”——状态只是由您的TCPSender类已经公开的公共接口定义的。但是为了帮助您理解测试输出,这里有一个TCPSender在流生命周期中的预期演化图。(在实验4之前,您不必担心错误状态或RST标志。)

https://files.epis2048.net/uploads/2022/图像11.png

2.4 FAQs

  • Q:我如何“发送”一个段?
  • A:将它推到segments_out队列中。就您的TCPSender而言,只要您将其推入此队列,就不需要考虑别的了,他的主人很快就会来将他认领走。
  • Q:等等,我该如何既“发送”一个片段,又跟踪该片段的突出内容,这样我就知道以后要重新发送什么内容?我不需要复制每个片段吗?那是浪费吗?
  • A:当您发送一个包含数据的段时,您可能希望将它推到段输出队列中,并在数据结构中保留它的内部副本,以便您跟踪未完成的段,以便可能的重传。这证明不是很浪费,因为段的有效负载存储为一个引用计数的只读字符串(一个Buffer对象)。所以不用担心,它实际上并没有复制有效载荷数据。
  • Q:在我从接收方得到ACK之前,我的TCPSender应该假设接收方的窗口大小是多少?
  • A:One Byte。
  • Q:如果一个确认只部分地确认一些未完成的部分,我该怎么办?我应该剪掉被确认的字节吗?
  • A:TCP发送方可以这样做,但是对于这个类来说,没有必要太花哨。将每个段视为完全未处理,直到它被完全确认—它占用的所有序列号都小于ackno。
  • Q:如果我发送了包含“a”、“b”和“c”的三个单独的片段,它们从来没有被确认,我可以在一个包含“abc”的大片段中重新发送它们吗?还是我需要逐个重新传输每个片段?
  • A:TCP发送方也可以这样做,但对于这个类来说,不必太花哨。只需单独跟踪每个未完成的段,当重传计时器到期时,再次发送最早的未完成的段。
  • Q:我应该在我的“未完成的”数据结构中存储空段,并在必要时重新传输它们吗?
  • A:不——唯一应该被跟踪为突出的部分,并可能被重传的部分,是那些传递一些数据的部分,即。在序列空间中消耗一定的长度。一个没有占用序列号(没有负载、SYN或FIN)的段不需要被记住或重传。

解决方案
只要将实验指导书中的内容理清楚,就OK了。所有的细节实验指导书中都提到了。这里直接贴代码吧。
首先在TCPSender中增加了一些成员变量,用途都给在了注释中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// libsponge\tcp_sender.hh
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 isn_;

//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> segments_out_{};
//! 等待返回接收成功的段
std::queue<TCPSegment> segments_wait_{};

//! retransmission timer for the connection
unsigned int initial_retransmission_timeout_;
//! RTO,重传等待时间
unsigned int retransmission_timeout_;
//! 重传次数
uint16_t consecutive_retransmissions_ = 0;
//! 重传计时器
size_t retransmissions_timer_ = 0;
//! 重传计时器是否启动
bool retransmissions_timer_running_ = false;

//! outgoing stream of bytes that have not yet been sent
ByteStream stream_;

//! the (absolute) sequence number for the next byte to be sent
uint64_t next_seqno_{0};
uint64_t recv_seqno_{0};

// 标记是否开始和结束
bool syn_ = false;
bool fin_ = false;

// 尚在发送中的数据大小
uint64_t bytes_in_flight_ = 0;
// 接收方的窗口大小
uint16_t receiver_window_size_ = 0;

然后就是完成四个函数了,每一步的目的都给在了注释中。其中将发送TCP段的过程单独提出来作为一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// libsponge\tcp_sender.cc
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: isn_(fixed_isn.value_or(WrappingInt32{random_device()()}))
, initial_retransmission_timeout_{retx_timeout}
, retransmission_timeout_(retx_timeout)
, stream_(capacity) {}

uint64_t TCPSender::bytes_in_flight() const { return bytes_in_flight_; }

void TCPSender::fill_window() {
TCPSegment tcp_segment;
// 如果发送完了
if (fin_) {
return;
}
// 如果还没有开始
if (!syn_) {
tcp_segment.header().syn = true;
send_tcp_segment(tcp_segment);
syn_ = true;
return;
}
// 窗口大小,未知则假设为1
uint16_t window_size = (receiver_window_size_ > 0 ? receiver_window_size_ : 1);
// 发送结束,单独返回一个FIN包
if (stream_.eof() && recv_seqno_ + window_size > next_seqno_) {
tcp_segment.header().fin = true;
send_tcp_segment(tcp_segment);
fin_ = true;
return;
}
// 循环发送,直到无新的字节需要读取或者无可用空间
while (!stream_.buffer_empty() && recv_seqno_ + window_size > next_seqno_) {
// 根据大小来读取数据
size_t send_size =
min(TCPConfig::MAX_PAYLOAD_SIZE, static_cast<size_t>(window_size - (next_seqno_ - recv_seqno_)));
tcp_segment.payload() = stream_.read(min(send_size, stream_.buffer_size()));
// 如果发送完了,则添加FIN标志
if (stream_.eof() && tcp_segment.length_in_sequence_space() < window_size) {
tcp_segment.header().fin = true;
fin_ = true;
}
// 发送
send_tcp_segment(tcp_segment);
}
}

void TCPSender::send_tcp_segment(TCPSegment &tcp_segment) {
// 设置序号
tcp_segment.header().seqno = wrap(next_seqno_, isn_);
// 放入队列中
segments_out_.push(tcp_segment);
segments_wait_.push(tcp_segment);
// 本地保存发送的数据大小
next_seqno_ += tcp_segment.length_in_sequence_space();
bytes_in_flight_ += tcp_segment.length_in_sequence_space();
// 启动重传计时器
if (!retransmissions_timer_running_) {
retransmissions_timer_running_ = true;
retransmissions_timer_ = 0;
}
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t abs_ackno = unwrap(ackno, isn_, next_seqno_);
// 超出了窗口范围
if (abs_ackno > next_seqno_) {
return;
}
// 设置新的窗口大小
if (abs_ackno >= recv_seqno_) {
recv_seqno_ = abs_ackno;
receiver_window_size_ = window_size;
}
// 删除已经确认发送成功的段
bool pop = false;
while (!segments_wait_.empty()) {
TCPSegment tcp_segment = segments_wait_.front();
// 当前队列头的段还未发送成功
if (abs_ackno <
unwrap(tcp_segment.header().seqno, isn_, next_seqno_) + tcp_segment.length_in_sequence_space()) {
return;
}
// 发送成功,要出队列,修改发送的数据大小
segments_wait_.pop();
bytes_in_flight_ -= tcp_segment.length_in_sequence_space();
// 重置重传超时时间、重传次数与重传计时器
retransmission_timeout_ = initial_retransmission_timeout_;
consecutive_retransmissions_ = 0;
retransmissions_timer_ = 0;
pop = true;
}
// 如果有新的空间打开,则尝试填充之
if (pop) {
fill_window();
}
// 重传计时器是否启动取决于发送方是否有未完成的数据
retransmissions_timer_running_ = !segments_wait_.empty();
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
// 重传计时器是否启动
if (!retransmissions_timer_running_) {
return;
}
// 记录时间
retransmissions_timer_ += ms_since_last_tick;
// 已超时
if (retransmissions_timer_ >= retransmission_timeout_ && !segments_wait_.empty()) {
// 重传数据
TCPSegment tcp_segment = segments_wait_.front();
segments_out_.push(tcp_segment);
// 重置计时器
retransmissions_timer_ = 0;
// 记录连续重传的数量,并将RTO值翻倍
if (receiver_window_size_ > 0 || tcp_segment.header().syn) {
consecutive_retransmissions_++;
retransmission_timeout_ *= 2;
}
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return consecutive_retransmissions_; }

void TCPSender::send_empty_segment() {
TCPSegment tcp_segment;
tcp_segment.header().seqno = wrap(next_seqno_, isn_);
segments_out_.push(tcp_segment);
}