0%

Stanford-CS144-Sponge 笔记 - Lab 4: The TCP Connection

第四个实验要求实现一个TCPConnection类,主要功能有:封装TCPSender和TCPReceiver;构建TCP的有限状态机(FSM)。

1. OverView

现在,在实验4中,您将创建一个称为TCPConnection的总体模块,该模块结合了您的TCPSender和TCPReceiver,并处理连接的全局管理。连接的TCP段可以封装到用户(UDP中的TCP)或Internet (TCP/IP)数据报的有效负载中——让您的代码与Internet上使用相同TCP/IP语言的数十亿台其他计算机通信。

一个简短的注意事项:TCPConnection主要是将您在早期实验室中实现的发送方和接收方模块组合在一起——TCPConnection本身可以在不到100行代码中实现。如果你的发送者和接收者都很健壮,这将是一个简短的实验。如果不是,您可能需要在测试失败消息的帮助下花时间调试。(我们建议您不要尝试阅读测试源代码,除非这是最后的手段。)

https://files.epis2048.net/uploads/2022/图像7.png

2. The TCP connection

本周,您将使用您在前四个实验中完成的部分来构建一个TCP的实现。至此,您已经完成了大部分工作:实现了发送方和接收方。您本周的工作是将它们“连接”到一个对象(一个TCPConnection)中,并处理一些全局的维护任务。

回想一下:TCP可靠地传递一对流控制的字节流,每个方向一个。两方参与TCP连接,每一方同时作为“发送方”(自身出站字节流)和“接收方”(自身入站字节流):

https://files.epis2048.net/uploads/2022/图像12.png

这两方(上图中的“A”和“B”)被称为连接的“端点(endpoints)”,或“对等点(peers)”。你的TCPConnection作为对等点的一侧,它负责接收和发送段,确保发送方和接收方被告知并有机会为传入和传出段贡献他们关心的字段。

接收部分。当TCPConnection的段接收方法被调用时,TCPConnection从Internet接收TCPSegments。当发生这种情况时,TCPConnection会查看这个段并:

  • 如果设置了RST (reset)标志,将入站流和出站流都设置为错误状态,并永久终止连接。否则……
  • 把这个段给TCPReceiver,这样它就可以检查传入段上它关心的字段:seqno、syn、payload和fin。
  • 如果设置了ack标志,告诉TCPSender它所关心的传入段上的字段:ackno和window_size。
  • 如果传入的段占用了任何序列号,TCPConnection确保至少发送一个段作为应答,来反馈ackno和window_size的更新。
  • 在TCPConnection的segment_received()方法中,你必须处理一个额外的特殊情况:响应一个“keep-alive”段。对面可能选择发送一个无效的序列号段来查看你的TCP实现是否仍然活着(如果是,那么你当前的窗口是什么)。你的TCPConnection应该回复这些“keep-alive”,即使它们不占用任何序列号。实现此功能的代码如下所示:
    https://files.epis2048.net/uploads/2022/图像13.png

发送部分。TCPConnection将通过Internet发送TCPSegments:

  • 任何时候TCPSender将一个段推入它的出站队列,并设置了它在出站段上负责的字段:(seqno、syn、payload和fin)。
  • 在发送段之前,TCPConnection会向TCPReceiver询问它在传出段中负责的字段:ackno和window size。如果有ackno,它将设置ack标志和TCPSegment中的字段。

当时间的流逝。TCPConnection有一个tick方法,将由操作系统定期调用。当发生这种情况时,TCPConnection需要:

  • 告诉TCPSender时间的流逝。
  • 如果连续重传的次数超过了TCPConfig::MAX_RETX_ATTEMPTS的上限,则终止连接,并向对面发送一个重置段(一个空段,设置了RST标志)。
  • 如果需要的话,干净地结束连接。

因此,每个TCPSegment的整体结构看起来像这样,“发送者写入”和“接收者写入”字段显示为不同的颜色:

https://files.epis2048.net/uploads/2022/图像14.png

TCPConnection的完整接口在类文档中。请花点时间把这个通读一遍。你的大部分实现都涉及到将TCPConnection的公共API“连接”到TCPSender和TCPReceiver中的适当例程。尽可能地将任何繁重的工作推迟到已经实现的发送方和接收方。也就是说,并不是所有事情都那么简单,还有一些微妙之处涉及到整体连接的“全局”行为。最困难的部分将是决定何时完全终止一个TCPConnection并声明它不再“活动”。

2.1 FAQs

  • Q:预估要写多少代码?
  • A:总的来说,我们预计实现(在tcp_connection.cc中)总共需要大约100-150行代码。完成之后,测试套件将广泛测试您与自己的实现以及Linux内核的TCP实现之间的互操作性。
  • Q:我应该如何开始?
  • A:可能最好的开始方法是将一些“普通”方法连接到TCPSender和TCPReceiver中的适当调用。这可能包括remaining_outbound_capacity()、bytes_in_flight()和unassembled_bytes()。
    然后你可以选择实现“writer”方法:connect()、write()和end_input_stream()。其中一些方法可能需要对出站的ByteStream(由TCPSender拥有)做一些事情,并将其告知TCPSender。
    您可以选择在完全实现每个方法之前开始运行测试套件(进行检查);测试失败消息可以为您提供下一步处理的线索或指南。
  • Q:应用程序如何从入站流读取数据?
  • A:TCPConnection::inbound stream()已经在头文件中实现了。您不需要再做任何事情来支持应用程序读取。
  • Q:TCPConnection需要任何奇特的数据结构或算法吗?
  • A:不,真的没有。繁重的工作都是由您已经实现的TCPSender和TCPReceiver完成的。这里的工作实际上就是把所有东西连接起来,处理一些挥之不去的连接范围内的微妙之处,这些微妙之处不容易被发送者和接收者考虑进去。
  • Q:TCPConnection如何实际发送一个段?
  • A:与TCPSender类似——将其推入segment_out队列。就您的TCPConnection而言,只要您将其推入此队列,就可以考虑将其发送。很快所有者就会出现并弹出它(使用公共段out()访问器方法)并真正发送它。
  • Q:TCPConnection如何了解时间的流逝?
  • A:与TCPSender类似,tick()方法将定期调用。请不要用其他任何方式来显示时间——tick()是你了解时间流逝的唯一方法。这保持了事物的确定性和可测试性。
  • Q:如果传入的段设置了RST标志,TCPConnection会做什么?
  • A:这个标志(“重置”)意味着连接立即死亡。如果您接收到带有RST标志的段,您应该在入站和出站的ByteStreams上设置错误标志,并且任何对TCPConnection::active()的后续调用都应该返回false。
  • Q:什么时候我应该发送一个包含RST标志的段?
  • A:在两种情况下,您需要中止整个连接
    1. 如果发送方发送了太多的连续重传而没有成功(多于TCPConfig::MAX RETX ATTEMPTS,例如8)。
    2. 如果在连接仍处于活动状态时调用TCPConnection析构函数(active()返回true)。
      发送一个带有RST标志的段与接收一个类似的效果:连接将会死亡,不再活动(),并且两个字节流都应该被设置为错误状态。
  • Q:等等,但是我怎么生成一个设置RST标志的段?序列号是多少?
  • A:任何传出段都需要有适当的序列号。您可以通过调用它的send_empty_segment()方法来强制TCPSender生成一个具有正确序列号的空段。或者你可以通过调用fill_window()方法让它填充窗口(如果它有未完成的信息要发送,则生成段,例如来自流或SYN/FIN的字节)。
  • Q:ack标志的目的是什么?不是总要用ackno吗?
  • A:几乎每个TCPSegment都有一个ackno,并且设置了ack标志。异常发生在连接的最开始,在接收方有任何需要确认的信息之前。
    在传出的段中,你需要尽可能设置ackno和ack标志。也就是说,每当TCPReceiver的ackno()方法返回一个std::optional<WrappingInt32>,它有一个值,您可以使用has_value()测试。
    在传入的段中,只有设置了ack字段,才需要查看ackno。如果是,将ackno(和window_size)给TCPSender。
  • Q:我如何解读这些“状态”名称(如“stream started”或“stream ongoing”)?
  • A:请看实验二和实验三的图表。
    我们想再次强调,“状态”对于测试和调试非常有用,但我们并不是要求您在代码中实现这些状态。你不需要做更多的状态变量来跟踪它。“状态”只是你的模块已经暴露的公共接口的一个函数。
  • Q:如果TCPReceiver想要公布一个大于TCPSegment::header().win字段的窗口大小,我应该发送多大的window_size?
  • A:尽你所能传递最大的价值。您可能会发现std::numeric_limits类很有帮助。
  • Q:什么时候TCP连接最终“完成”?什么时候active()可以返回false?
  • A:请参阅下一节

2.2 The end of a TCP connection: consensus takes work

TCPConnection的一个重要功能是决定TCP连接何时完全“完成”。当发生这种情况时,我们将释放它对本地端口号的占用,停止向传入的段发送应答确认,将连接视为历史,并让它的active()方法返回false。

连接有两种结束方式。在不正常关闭时,TCPConnection发送或接收一个设置了RST标志的段。在这种情况下,出站和入站的ByteStreams都应该处于错误状态,并且active()可以立即返回false。

干净关闭是我们如何做到“done”(active() = false)而不出错的方法。这更复杂,但它是一件美好的事情,因为它尽可能确保两个字节流的每个都已可靠地完全交付给接收端。在下一节中,我们将给出当彻底关闭发生时的实际结果。

由于两军问题,不可能保证两个对等体都能实现干净的关闭,但TCP非常接近。这是如何。从一个peer的角度来看(一个TCPConnection,我们称它为“本地”peer),有四个先决条件来在它与“远程”peer的连接中有一个干净的关闭:

  1. 入站流已全部组装完毕。
  2. 出站流已经被本地应用程序结束,并完全发送(包括它结束的事实,即一个带有fin的段)到远端对等端。
  3. 出流已被远端对等体完全确认。
  4. 本地TCPConnection有信心远端能够满足条件#3。这是让人绞尽脑汁的部分。有两种可能发生这种情况:
    可能A:在两个流结束后徘徊。前提条件#1到#3是正确的,并且远端对等体似乎已经得到了本地对等体对整个流的确认。本地对等端并不知道这一点,因为tcp不可靠地交付ack(它不ack)。但是本地对等体非常确信远端对等体已经得到了它的ack,因为远端对等体似乎没有重传任何东西,而本地对等体已经等待了一段时间来确认。
    具体来说,当满足先决条件#1到#3,并且它至少是初始重传超时的10倍(cfg。Rt超时),因为本地对等体收到了来自远端对等体的任何段。这被称为两个流结束后的“滞留”,以确保远端对等端没有尝试重新传输任何我们需要确认的内容。它确实意味着一个TCPConnection需要存活一段时间,保持本地端口号的独占声明,并可能发送ack来响应进入的段,即使在TCPSender和TCPReceiver完全完成它们的工作,并且两个流都结束之后。
    可能B:被动关闭。前提条件#1到#3为真,并且本地对等体100%确定远端对等体能够满足前提条件#3。如果TCP不承认确认,这怎么可能呢?因为远端对等端是第一个结束其流的。
    底线是,如果TCPConnection的入站流在TCPConnection发送fin段之前结束,那么TCPConnection不需要在两个流结束后都停留。

实际上,这意味着您的TCPConnection有一个名为linger_after_streams_finish的成员变量,该变量通过state()方法暴露给测试设备。变量开始时为true。如果入站流在TCPConnection到达出站流的EOF之前结束,这个变量需要设置为false。

在任何满足条件#1到#3的点上,如果linger_after_streams_finish为false,则连接为“完成”(并且active()应该返回false)。否则你需要逗留:只有在足够的时间(10 × cfg.Rt_timeout)自接收到最后一个段以来已经过去。

解决方案
主要还是参考了Smith老哥的一些实现。
TCP状态方面,主要参考http://ttcplinux.sourceforge.net/documents/one/tcpstate/tcpstate.htmlhttps://www.jianshu.com/p/3c7a0771b67ehttps://cloud.tencent.com/developer/news/646106
添加并修改了一些成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// libsponge\tcp_connection.hh
private:
TCPConfig cfg_;
TCPReceiver receiver_{cfg_.recv_capacity};
TCPSender sender_{cfg_.send_capacity, cfg_.rt_timeout, cfg_.fixed_isn};

//! outbound queue of segments that the TCPConnection wants sent
std::queue<TCPSegment> segments_out_{};

//! Should the TCPConnection stay active (and keep ACKing)
//! for 10 * cfg_.rt_timeout milliseconds after both streams have ended,
//! in case the remote TCPConnection doesn't know we've received its whole stream?
bool linger_after_streams_finish_{true};

size_t time_since_last_segment_received_ = 0;
bool active_ = true;

void send_data();
void reset_connection();

各个函数的实现如下,过程基本都在注释里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// libsponge\tcp_connection.cc
size_t TCPConnection::remaining_outbound_capacity() const { return sender_.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return sender_.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return receiver_.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return time_since_last_segment_received_; }

void TCPConnection::segment_received(const TCPSegment &seg) {
// 非启动时不接收
if (!active_) {
return;
}

// 重置连接时间
time_since_last_segment_received_ = 0;

// RST标志,直接关闭连接
if (seg.header().rst) {
// 在出站入站流中标记错误,使active返回false
receiver_.stream_out().set_error();
sender_.stream_in().set_error();
active_ = false;
}
// 当前是Closed/Listen状态
else if (sender_.next_seqno_absolute() == 0) {
// 收到SYN,说明TCP连接由对方启动,进入Syn-Revd状态
if (seg.header().syn) {
// 此时还没有ACK,所以sender不需要ack_received
receiver_.segment_received(seg);
// 我们主动发送一个SYN
connect();
}
}
// 当前是Syn-Sent状态
else if (sender_.next_seqno_absolute() == sender_.bytes_in_flight() && !receiver_.ackno().has_value()) {
if (seg.header().syn && seg.header().ack) {
// 收到SYN和ACK,说明由对方主动开启连接,进入Established状态,通过一个空包来发送ACK
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
sender_.send_empty_segment();
send_data();
} else if (seg.header().syn && !seg.header().ack) {
// 只收到了SYN,说明由双方同时开启连接,进入Syn-Rcvd状态,没有接收到对方的ACK,我们主动发一个
receiver_.segment_received(seg);
sender_.send_empty_segment();
send_data();
}
}
// 当前是Syn-Revd状态,并且输入没有结束
else if (sender_.next_seqno_absolute() == sender_.bytes_in_flight() && receiver_.ackno().has_value() &&
!receiver_.stream_out().input_ended()) {
// 接收ACK,进入Established状态
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
}
// 当前是Established状态,连接已建立
else if (sender_.next_seqno_absolute() > sender_.bytes_in_flight() && !sender_.stream_in().eof()) {
// 发送数据,如果接到数据,则更新ACK
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
if (seg.length_in_sequence_space() > 0) {
sender_.send_empty_segment();
}
sender_.fill_window();
send_data();
}
// 当前是Fin-Wait-1状态
else if (sender_.stream_in().eof() && sender_.next_seqno_absolute() == sender_.stream_in().bytes_written() + 2 &&
sender_.bytes_in_flight() > 0 && !receiver_.stream_out().input_ended()) {
if (seg.header().fin) {
// 收到Fin,则发送新ACK,进入Closing/Time-Wait
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
sender_.send_empty_segment();
send_data();
} else if (seg.header().ack) {
// 收到ACK,进入Fin-Wait-2
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
send_data();
}
}
// 当前是Fin-Wait-2状态
else if (sender_.stream_in().eof() && sender_.next_seqno_absolute() == sender_.stream_in().bytes_written() + 2 &&
sender_.bytes_in_flight() == 0 && !receiver_.stream_out().input_ended()) {
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
sender_.send_empty_segment();
send_data();
}
// 当前是Time-Wait状态
else if (sender_.stream_in().eof() && sender_.next_seqno_absolute() == sender_.stream_in().bytes_written() + 2 &&
sender_.bytes_in_flight() == 0 && receiver_.stream_out().input_ended()) {
if (seg.header().fin) {
// 收到FIN,保持Time-Wait状态
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
sender_.send_empty_segment();
send_data();
}
}
// 其他状态
else {
sender_.ack_received(seg.header().ackno, seg.header().win);
receiver_.segment_received(seg);
sender_.fill_window();
send_data();
}
}

bool TCPConnection::active() const { return active_; }

size_t TCPConnection::write(const string &data) {
if (data.empty()) {
return 0;
}

// 在sender中写入数据并发送
size_t size = sender_.stream_in().write(data);
sender_.fill_window();
send_data();
return size;
}

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
// 非启动时不接收
if (!active_) {
return;
}

// 保存时间,并通知sender
time_since_last_segment_received_ += ms_since_last_tick;
sender_.tick(ms_since_last_tick);

// 超时需要重置连接
if (sender_.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
reset_connection();
return;
}
send_data();
}

void TCPConnection::end_input_stream() {
sender_.stream_in().end_input();
sender_.fill_window();
send_data();
}

void TCPConnection::connect() {
// 主动启动,fill_window方法会发送Syn
sender_.fill_window();
send_data();
}

void TCPConnection::send_data() {
// 将sender中的数据保存到connection中
while (!sender_.segments_out().empty()) {
TCPSegment seg = sender_.segments_out().front();
sender_.segments_out().pop();
// 尽量设置ackno和window_size
if (receiver_.ackno().has_value()) {
seg.header().ack = true;
seg.header().ackno = receiver_.ackno().value();
seg.header().win = receiver_.window_size();
}
segments_out_.push(seg);
}
// 如果发送完毕则结束连接
if (receiver_.stream_out().input_ended()) {
if (!sender_.stream_in().eof()) {
linger_after_streams_finish_ = false;
}

else if (sender_.bytes_in_flight() == 0) {
if (!linger_after_streams_finish_ || time_since_last_segment_received() >= 10 * cfg_.rt_timeout) {
active_ = false;
}
}
}
}

void TCPConnection::reset_connection() {
// 发送RST标志
TCPSegment seg;
seg.header().rst = true;
segments_out_.push(seg);

// 在出站入站流中标记错误,使active返回false
receiver_.stream_out().set_error();
sender_.stream_in().set_error();
active_ = false;
}

TCPConnection::~TCPConnection() {
try {
if (active()) {
cerr << "Warning: Unclean shutdown of TCPConnection\n";

// Your code here: need to send a RST segment to the peer
reset_connection();
}
} catch (const exception &e) {
std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}

最终结果如下:
https://files.epis2048.net/uploads/2022/图像15.png
最后,不知道为什么,官方给的VirtualBox的镜像时钟问题很大,需要经常执行sudo ntpdate cn.pool.ntp.org