0%

Stanford-CS144-Sponge 笔记 - Lab 2: The TCP Receiver

第二个实验要求实现一个TCP接收器。

1. OverView

https://files.epis2048.net/uploads/2022/图像7.png

如上图所示,这是我们要实现的TCP的各个部分。

除了写入传入的流之外,TCPReceiver还负责告诉发送方两件事:

  1. “first unassembled”的字节的index,称为“acknowledgment number”或“ackno”。这是接收方需要从发送方得到的第一个字节
  2. “first unassembled”的index与“first unacceptable”的index之间的距离。这被称为“窗口大小”。

同时,ackno和窗口大小描述描述了接收方的窗口:TCP发送方允许发送的index范围。使用这个窗口,接收方可以控制传入的数据流,让发送方限制发送的数量,直到接收方准备接收更多的数据。我们有时把ackno称为窗口的“左边缘”(TCPReceiver感兴趣的最小索引),把ackno +窗口大小称为窗口的“右边缘”(刚好超过TCPReceiver感兴趣的最大索引)。

当你写了StreamReassembler和ByteStream时,你已经完成了实现TCPReceiver所涉及的大部分算法工作;这个实验室是关于将这些一般类连接到TCP的细节。最困难的部分将涉及到考虑TCP将如何表示每个字节在流中的位置——称为“序列号”。

2. Translating between 64-bit indexes and 32-bit seqnos

作为热身,我们需要实现TCP表示索引的方式。上周您创建了一个StreamReassembler,它重组子字符串,其中每个字节都有一个64位流索引,流中的第一个字节总是索引为0。64位索引足够大,我们可以将其视为永不溢出。然而,在TCP报头中,空间是宝贵的,流中的每个字节的索引不是用64位的索引表示的,而是用32位的“序列号”或“seqno”表示的。这增加了三个复杂性:

  1. 您的实现需要为32位整数进行规划:TCP中的流可以是任意长的——对于可以通过TCP发送的字节流的长度没有限制。但是232字节只有4GiB,并不是很大。一旦一个32位的序列号计数到232−1,流中的下一个字节的序列号将为0。
  2. TCP序列号从一个随机值开始:为了提高安全性,并避免被属于同一端点之间早期连接的旧段所混淆,TCP试图确保序列号不会被猜测,并且不太可能重复。所以流的序列号不是从0开始的。流中的第一个序列号是一个随机的32位数字,称为初始序列号(Initial sequence number, ISN)。这是表示SYN(流的开始)的序列号。其余的序列号在此之后正常运行:数据的第一个字节将有ISN+1 (mod 232)的序列号,第二个字节将有ISN+2 (mod 232),等等。
  3. 每个逻辑开始和结束占用一个序列号:除了确保接收到所有字节的数据外,TCP还确保可靠地接收流的开始和结束。因此,在TCP中SYN (start -ofstream)和FIN (end- stream)控制标志被分配了序列号。每一个都占用一个序列号。(SYN标志占用的序列号是ISN。)流中的每个数据字节也占用一个序列号。请记住,SYN和FIN不是流本身的一部分,也不是“字节”——它们表示字节流本身的开始和结束。

这些序列号(seqnos)在每个TCP段的报头中传输。(同样,这里也有两条流,每个方向各一条。每个流都有单独的序列号和不同的随机ISN。)有时讨论“绝对序列号”的概念(它总是从0开始,不换行)和“流索引”(您已经在您的StreamReassembler中使用过:流中每个字节的索引,从0开始)也是有帮助的。

为了使这些区别更具体,考虑只包含三个字母字符串’ cat ‘的字节流。如果SYN刚好有seqno 232−2,则每个字节的seqnos、absolute seqnos和流索引为:

element SYN c a t FIN
seqno 232−2 232−1 0 1 2
absolute seqno 0 1 2 3 4
stream index 0 1 2

其三种index的类型如下:

Sequence Numbers Absolute Sequence Numbers Stream Indices
Start at the ISN Start at 0 Start at 0
Include SYN/FIN Include SYN/FIN Omit SYN/FIN
32 bits, wrapping 64 bits, non-wrapping 64 bits, non-wrapping
“seqno” “absolute seqno” “stream index”

absolute sequence numbers和stream index之间的转换很简单——只需要加或减一。不幸的是,在sequence numbers和absolute sequence numbers之间转换有点困难,混淆两者可能会产生棘手的bug。为了系统地防止这些错误,我们将用一个自定义类型来表示序列号:WrappingInt32,并编写它与absolute sequence numbers(以uint64 t表示)之间的转换。WrappingInt32是一个包装器类型的例子:包含内部类型(在本例中是uint32 t)但提供一组不同的函数/操作符的类型。

解决方案
需要我们完成两个方向的转换:AboSeq->Seq以及Seq->AboSeq。
其中AboSeq->Seq直接利用加法重载,对于unsigned类型,溢出直接归零,不需要单独处理。

1
2
3
4
5
// libsponge\wrapping_integers.cc
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
// 超出直接溢出即可
return WrappingInt32{static_cast<uint32_t>(n) + isn.raw_value()};
}

Seq->AboSeq这个过程需要把新接收到的报文段的seqno(WrappingInt32)转换为64位的index,其结果不唯一,所以需要取与上一次收到的报文段的index(checkpoint)最接近的那个转换结果。
于是,可以先用刚写好的wrap函数把checkpoint变为 WrappingInt32,然后利用第一个减法重载,找出这个转换后的seqno最少需要走几步可以到新报文的seqno,然后把这个步数加到 checkpoint上。如果是往反方向走的,走完结果可能是负数,需要单独处理一下:

1
2
3
4
5
6
7
8
9
// libsponge\wrapping_integers.cc
uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
// 找到n和checkpoint之间的最小步数
int32_t min_step = n - wrap(checkpoint, isn);
// 将步数加到checkpoint上
int64_t ret = checkpoint + min_step;
// 如果反着走的话要加2^32
return ret >= 0 ? static_cast<uint64_t>(ret) : ret + (1ul << 32);
}

3. Implementing the TCP receiver

在本实验室的其余部分,您将实现TCPReceiver。它将(1)从它的peer接收段(2)使用你的StreamReassembler重新组装ByteStream(3)计算确认号码(ackno)和窗口大小。确认码和窗口大小最终会在传出的段中传输回对等体。

首先,请回顾TCP段的格式。这是两个端点互相发送的消息;它是低级数据报的有效负载。未显示为灰色的字段表示本实验室感兴趣的信息:序列号、有效负载以及SYN和FIN标志。这些字段由发送方写入,接收方读取并处理。

https://files.epis2048.net/uploads/2022/图像9.png

3.1 segment received()

这是接收器的主要方法,每次结构到新的段时都会调用该方法。该方法需要:

  • 必要时设置初始序列号。设置了SYN标志的第一个开始的段的序列号是初始序列号。为了在32位wrapped的seqnos/acknos和它们的绝对absolute equivalents之间保持转换,您需要跟踪这一点。(注意SYN标志只是头文件中的一个标志。同一段也可以携带数据,甚至可以设置FIN标志。)
  • 将任何数据或流结束标记推到StreamReassembler。如果在TCPSegment的报头中设置了FIN标志,这意味着有效负载的最后一个字节就是整个流的最后一个字节。记住,StreamReassembler期望流索引从0开始;你需要unwrap seqnos老产生这些。

3.2 ackno()

返回一个optional<WrappingInt32>,其中包含接收方不知道的第一个字节的序列号。这是窗口的左边缘:接收方感兴趣接收的第一个字节。如果ISN还没有设置,返回一个空的可选值。

3.3 window size()

返回“first unassembled”索引(对应于ackno的索引)和“first unacceptable”索引之间的距离。

解决方案
对于segment_received(),即接受一个TCP段的方法,根据前文描述完成即可。最终要把数据保存在reassembler_中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// libsponge\tcp_receiver.cc
void TCPReceiver::segment_received(const TCPSegment &seg) {
// 未开始
if (isn_ == nullopt && !seg.header().syn) {
return;
}
// 设置初始序列号
if (seg.header().syn) {
isn_ = seg.header().seqno;
}
// 获取reassembler的index,即Seq->AboSeq
int64_t abo_seqno = unwrap(seg.header().seqno + static_cast<int>(seg.header().syn), isn_.value(), checkpoint_);
// 将任何数据或流结束标记推到StreamReassembler,序号使用Stream Indices,即AboSeq - 1
reassembler_.push_substring(seg.payload().copy(), abo_seqno - 1, seg.header().fin);
// 设置新的checkpoint
checkpoint_ += seg.length_in_sequence_space();
}

ackno()要返回接收方不知道的第一个字节的序列号,即窗口的左边缘,参考该图:
https://files.epis2048.net/uploads/2022/图像8.png

1
2
3
4
5
6
7
8
9
10
11
12
13
// libsponge\tcp_receiver.cc
optional<WrappingInt32> TCPReceiver::ackno() const {
// ISN还未设置
if (isn_ == nullopt)
return nullopt;
// Stream Indices->AboSeq
uint64_t written = stream_out().bytes_written() + 1;
if (stream_out().input_ended()) {
written++;
}
// AboSeq->Seq
return wrap(written, isn_.value());
}

window_size()也需要参考上图,返回“first unassembled”索引(对应于ackno的索引)和“first unacceptable”索引之间的距离。

1
2
3
4
5
// libsponge\tcp_receiver.cc
size_t TCPReceiver::window_size() const {
// first_unacceptable - first_unassembled
return capacity_ - stream_out().buffer_size();
}

此外,在测试过程中,我还发现我上个实验的StreamReassembler有些问题。这里直接使用deque重写数据的保存部分了:

1
2
3
4
5
6
7
8
// libsponge\stream_reassembler.hh
ByteStream output_; //!< The reassembled in-order byte stream
size_t capacity_; //!< The maximum number of bytes
size_t unassembled_bytes_; // 未发送至output_的数据大小
std::deque<char> datas_; // 数据
std::deque<bool> write_flag_; // 该数据位是否已经写入
size_t next_index_; // 下一个要写入的index
bool is_eof_; // 是否收到eof位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// libsponge\stream_reassembler.cc
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
if (index >= next_index_ + output_.remaining_capacity()) {
return;
}
// 设置eof标志
if (eof && index + data.size() <= next_index_ + output_.remaining_capacity()) {
is_eof_ = true;
}
// 如果所有数据都已经保存过了,那就跳过
if (index + data.size() > next_index_) {
// 将数据保存到datas_中,从index和next_index_中更大的一方开始,避免无用循环
for (size_t i = (index > next_index_ ? index : next_index_);
i < next_index_ + output_.remaining_capacity() && i < index + data.size();
i++) {
if (!write_flag_[i - next_index_]) {
datas_[i - next_index_] = data[i - index];
write_flag_[i - next_index_] = true;
unassembled_bytes_++;
}
}
// 将已经有序的数据发送到字节流中
string out_str;
while (write_flag_.front()) {
out_str += datas_.front();
datas_.pop_front();
datas_.push_back('\0');
write_flag_.pop_front();
write_flag_.push_back(false);
}
size_t out_len = out_str.size();
if (out_len > 0) {
unassembled_bytes_ -= out_len;
next_index_ += out_len;
output_.write(out_str);
}
}
// 如果输入结束且所有数据都发送了,则结束字节流
if (is_eof_ && empty()) {
output_.end_input();
}
}

4 Evolution of the TCPReceiver over the life of the connection

在一个TCP连接的过程中,你的TCPReceiver将通过一系列的状态演进:从等待SYN(空的确认),到一个正在进行的流,到一个已经完成的流,这意味着输入已经在ByteStream上结束。测试套件将检查你的TCPReceiver是否正确处理传入的TCPSegments,并通过这些状态演进,如下所示。(在实验4之前,您不必担心错误状态或RST标志。)

https://files.epis2048.net/uploads/2022/图像10.png