0%

CMU-15445-BusTub 笔记 - Lab 3: Query Execution

第三个项目要求实现一个执行器,不需要解析SQL。

实验指导书

第三个项目中,您将向使您的数据库系统支持查询执行。您将实现负责获取查询计划节点并执行他们的执行器,该执行器需要执行以下操作:

  1. 存取方法:Sequential Scan(顺序扫描)
  2. 修改类型:Insert, Update, Delete
  3. 其他:Nested Loop Join(循环嵌套join), Hash Join, Aggregation, Limit, Distinct

这时DBMS还不支持SQL,所以您的实现将直接在手工编写的查询计划上运行。

我们将使用迭代器模型(又称火山模型)。在该模型中,每个查询计划执行器都实现了一个Next函数。当DBMS调用执行器的Next函数时,执行器返回单个记录或返回没有记录了。通过这种方法,每个执行器实现一个循环,该循环继续调用其子对象的Next函数来检索记录并逐个处理他们。

在BusTub的迭代器模型实现中,每个执行的的Next函数除了返回一个记录外,还返回一个记录标识符(Record Identifier)。该记录标识符是该记录相对于其所鼠标的唯一标识符。

前言

在本任务中,您将实现九个执行器。对于每种查询计划运算符类型,都有一个相应的执行器对象来实现Init和Next方法。Init方法初始化操作符的内部状态(如检索要扫表的对应表)。Next方法提供迭代器的接口,该接口在每次调用时返回一个记录和对应的RID(也可能是执行器已经结束的标志)。

您将实现的执行的的header文件如下:

1
2
3
4
5
6
7
8
9
src/include/execution/executors/seq_scan_executor.h
src/include/execution/executors/insert_executor.h
src/include/execution/executors/update_executor.h
src/include/execution/executors/delete_executor.h
src/include/execution/executors/nested_loop_join_executor.h
src/include/execution/executors/hash_join_executor.h
src/include/execution/executors/aggregation_executor.h
src/include/execution/executors/limit_executor.h
src/include/execution/executors/distinct_executor.h

其他文件不需要实现。

每个执行器负责处理单个的Plan Node类型。Plan Node是组成查询计划的各个元素。每个Plan Node可以定义他所代表的操作符的特定信息。如:顺序扫描的Plan Node必须为执行扫描的表定义标识符,而Limit的Plan Node不需要此信息。这些Plan Node已经在如下头文件中定义:

1
2
3
4
5
6
7
8
9
src/include/execution/plans/seq_scan_plan.h
src/include/execution/plans/insert_plan.h
src/include/execution/plans/update_plan.h
src/include/execution/plans/delete_plan.h
src/include/execution/plans/nested_loop_join_plan.h
src/include/execution/plans/hash_join_plan.h
src/include/execution/plans/aggregation_plan.h
src/include/execution/plans/limit_plan.h
src/include/execution/plans/distinct_plan.h

这些Plan Node已经拥有了实现所需执行器的全部数据和功能,您不应该修改他们。

执行器从他们的子对象(可能是多个)中得到记录,并将记录交给父对象。他们可能以来其子对象的顺序,我们下文将会介绍。您还可以根据需要自由添加私有的Helper Function和类成员。

对于这个实验,我们假设执行器在单线程的上下文中执行。当多个线程并发执行时,不需要采取额外的措施。

我们将提供ExecutionEngine这个Helper类。他将输入的查询计划转换为查询执行器并执行他,直到生成所有结果。您必须修改ExecutionEngine来捕获由于查询执行过程中的failures而引发的异常。正确处理执行引擎中的failures对未来实验的成功非常重要。

要了解执行查询期间如何在运行时创建执行器,请参考ExecutorFactory类。此外,每个执行器都可以访问它执行的ExecutorContext。

其中一些执行器会对表进行一些修改(插入、更新、删除)。为了保持与底层表一致的表索引,这些执行器还需要更新被修改的表的所有索引。您将使用上一个实验中的可扩展哈希表作为此实验中所有索引的底层数据结构。

最后,在test/execution/executor_test.cpp文件中,我们提供了一些测试。这些测试远远算不上详尽,但他们可以告诉您在为执行器编写测试时从何处开始。

实验内容

Sequential Scan

SeqScanExecutor遍历表并一次返回一条记录。顺序扫描由SeqScanPlanNode指定。Plan Node指定被扫描的表,它还包含一个predicate,如果该记录不满足predicate,则扫描不会生成记录。

提示:使用TableIterator对象时要注意,++iter和iter++之间的切换会得到奇怪的输出。

提示:您需要在顺序扫描的Plan Node中使用predicate。特别是,注意AbstractExpression::Evaluate,它会返回一个值,在该值上可以调用GetAs<bool>()来将结果作为bool类型来使用。

提示:顺序扫描的结果是每条匹配记录及其RID的副本。

Insert

InsertExecutor将记录插入表中并更新索引。您的执行器需要支持两种插入类型。

第一种插入操作中要被插入的值直接嵌入到Plan Node中,我们称之为原始插入。(Insert into ... Values (...)

第二种插入操作要被插入的值来自于子执行器。例如,您现在有一个InserPlanNode,其中SeqScanPlanNode作为子节点来实现插入。(Insert into ... Select ...

你可以假设InsertExecutor始终位于其查询计划的根目录下,InsertExecutor不应该修改result set。

提示:在执行器初始化的时候需要查找插入目标的表信息。有关访问catalog的更多信息,参见后文System Catalog部分。

提示:您需要更新有关被插入的记录的所有索引。更多信息参考后文Index Updates部分。

提示:您需要使用TableHeap类来执行表修改。

Update

UpdateExecutor修改指定表中的现有记录并更新其索引。子执行器将提供Update执行器需要修改的记录及其RID。

和InsertExecutor不同,UpdateExecutor总是从子执行器中提取记录并更新。例如,UpdatePlanNode将有一个SqcScanPlanNode作为其子节点。

您可以假定UpdateExecutor始终位于其查询计划的根目录下,UpdateExecutor不应该修改result set。

提示:我们提供GenerateUpdatedTuple,它基于Plan Node中提供的Update列为您构建一条Updated的记录。

提示:在执行器初始化的时候需要查找插入目标的表信息。有关访问catalog的更多信息,参见后文System Catalog部分。

提示:您需要更新有关被插入的记录的所有索引。更多信息参考后文Index Updates部分。

提示:您需要使用TableHeap类来执行表修改。

Delete

DeleteExecutor从表中删除记录,并从所有表的索引中删除与其有关的项。和Update一样,要删除的记录也从子执行器(如SeqScanExecutor)中得到。

您可以假定DeleteExecutor始终位于其查询计划的根目录下,DeleteExecutor不应该修改result set。

提示:您只需要从子执行器中得到RID然后调用TableHeap::MarkDelete()即可有效的删除记录。所有删除将在事务被提交时应用。

提示:您需要更新有关被插入的记录的所有索引。更多信息参考后文Index Updates部分。

Nested Loop Join

NestedLoopJoinExecutor实现了一个基本的嵌套循环联接,他将两个子执行器中的记录组合在一起。

该执行器应实现本课程中介绍的简单嵌套循环连接算法。也就是说,对于联接的外部表中的每行记录,应考虑内部表中的每行记录。如果满足联接谓词,则应返回记录。

本实验的学习目标之一是让您了解逻辑运算符的不同物理实现的优缺点。因此,NestedLoopJoinExecutor实现嵌套循环联接的算法非常重要。例如,不要使用您后面实现的Hash Join执行器。为此,我们将测试NestedLoopJoinExecutor的IO开销,以确定它是否正确实现了算法。

提示,您需要使用嵌套循环联接的Plan Node中的predicate。特别是,注意处理左记录和右记录以及各自的模式的AbstractExpression::Evaluate,它会返回一个值,在该值上可以调用GetAs<bool>()来将结果作为bool类型来使用。

Hash Join

HashJoinExecutor实现了一个哈希连接操作,该操作将来自其两个子执行器的记录组合在一起。

顾名思义,哈希连接实在哈希表的帮助下实现的。出于本实验的目的,我们简化的哈希表,使其完全适合内存。这意味着您不必担心在视线中将build-side表的临时划分溢出到硬盘。

与NestedLoopJoinExecutor一样,我们将测试HashJoinExecutor的IO开销,以确定它是否正确实现了哈希连接算法。

提示:您的实现应该正确处理多个记录(在连接的任一侧)共享一个common join key的情况。

提示:HashJoinPlanNode定义了GetLeftJoinKey()和GetRightJoinKey()两个成员函数。您应该使用这些访问器返回的表达式来分别为连接的左侧和右侧构造join key。

提示:您需要一种方法来散列具有多个列的记录,以便构造其Unique Key。首先,看看AggregationExecutor中的SimpleAggregationHashTable是如何实现此功能的。

提示:回想一下,在查询计划的上下文中,hash join的生成端是一个pipeline breaker。这可能会影响您使用HashJoinExecutor::Init()和HashJoinExecutor::Next()的方式。特别的,考虑是在HashJoinExecutor::Init()还是HashJoinExecutor::Next()中执行连接的构建阶段。

Aggregation

此执行器将来自单个子执行器的多条记录的结果合并到一条记录中。在这个实验中,您需要实现Count,Sum,Min和Max。AggregationExecutor还必须支持Group By和Having子句。

正如讲义中所说,实现聚合的常见策略是使用哈希表,这是您将在本实验中使用的方法。但是我们简化了一些步骤,即聚合哈希表完全适合内存,这意味着您不必担心试下哈希聚合的两阶段(Partition,Rehash)策略,而是可以假设所有聚合结果都可以保留在内存哈希表中。

此外,我们还提供了SimpleAggregationHashTable数据结构。该结构公开了内存中的哈希表(std::unordered_map),但还提供了一个用于计算聚合的结构。这个类还公开了SimpleAggregationHashTable::Iterator的类型,该类型可用于遍历哈希表。

提示:您需要聚合结果并使用Having用于约束。特别的,看一下AbstractExpression::EvaluateAggregate,它处理不用类型表达式的聚合计算,它会返回一个值,在该值上可以调用GetAs<bool>()来将结果作为bool类型来使用。

提示:回想一下,在查询计划的上下文中,聚合是一个pipeline breaker。这可能会影响您使用AggregationExecutor::Init()和AggregationExecutor::Next()的方式。特别的,考虑是在AggregationExecutor::Init()还是AggregationExecutor::Next()中执行连接的构建阶段。

Limit

LimitExecutor约束其子执行器输出的记录行数。如果其子执行器生成的记录数小于LimitPlanNode中指定的限制,则此执行器无效,并返回其接收到的所有记录。

Distinct

DistinctExecutor消除从其子执行器得到的重复记录。在唯一确定的情况下,您的DistinctExecutor应该考虑输入元组的所有列。

与聚合一样,不同的运算符通常在哈希表的帮助下实现(这是一个哈希不同的操作)。这是您在本实验中将会使用到的方法。和AggregationExecutor与HashJoinExecutor一样,您可以假设用于实现DistinctExecutor的哈希表完全适合内存。

提示:为了构造一个唯一的键,您需要一种方法来散列可能包含很多列的记录。首先,看看AggregationExecutor中的SimpleAggregationHashTable是如何实现此功能的。

一些额外的信息

System Catalog

数据库维护一个内部目录,以跟踪有关数据库的元数据。在本实验中,您将与系统目录交互,以查询有关表、索引及其模式的信息。

整个目录实现都在src/include/catalog.h中。您应该特别注意成员函数Catalog::GetTable()和Catalog::GetIndex(),您将在执行器的实现中使用这些函数来查询目录中的表和索引。

Index Updates

对于表修改执行器(InsertExecutor、UpdateExecutor和DeleteExecutor),您可能会发现Catalog::GetTableIndexes()函数对于查询为特定表定义的所有索引非常有用。一旦为每个表的索引创建了IndexInfo实例,就可以对基础索引结构调用索引修改操作。

在本实验中,我们使用上一个实验中可扩展哈希表的实现作为所有索引操作的底层数据结构。因此,该实验的成功完成依赖于可扩展哈希表的工作实现。

实验过程

Executor

在实现之前,先看看提供的类的内容。

ExecutionEngine类用于执行查询计划,其中只有一个Execute函数。该函数先使用ExecutorFactory类创建了一个执行器,并对执行器初始化,然后循环调用执行器的Next函数,并将结果插入到result set中。

ExecutorFactory类就是根据Node Plan去创建执行器。

而到了每个具体的执行器类中,有一个Init函数和一个Next函数。Next函数返回了一个Bool类型的变量,供ExecutionEngine判断要不要继续执行。

在storage/table/中还有一个TableHead类和一个TableIterator类。根据注释,TableHeap类代表了磁盘中的一个物理表,其本质是一个pages组成的双向链表。TableIterator类提供对TableHeap进行顺序遍历的方法,其只存储了当前一条记录。其重载了一些基本符号,有:==, !=, ++, *, ->, =。

Schema类本质上是Column类的集合,即一个表中的所有列。

Column类和Tuple类分别分列和行,其中Tuple类不存储Schema。

Sequential Scan

这个部分用于顺序遍历整张表。由于一次Next函数只返回一条记录,故这里要存储一下TableHeap和TableIterator。

1
2
3
4
5
6
// include\execution\executors\seq_scan_executor.h
private:
/** The sequential scan plan node to be executed */
const SeqScanPlanNode *plan_;
TableHeap *table_heap_;
TableIterator iter_;

初始化的过程中获取当前的TableHeap,并将TableIterator初始化到Begin即可。

1
2
3
4
5
// execution\seq_scan_executor.cpp
void SeqScanExecutor::Init() {
table_heap_ = exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid())->table_.get();
iter_ = table_heap_->Begin(exec_ctx_->GetTransaction());
}

然后就是Next函数。在该函数中,要筛选被返回的列(Select … from)以及筛选要返回的行(from…)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// execution\seq_scan_executor.cpp
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
// 遍历完了返回false
if (iter_ == table_heap_->End()) {
return false;
}

// 获取RID和要返回的列
RID original_rid = iter_->GetRid();
const Schema *output_schema = plan_->OutputSchema();

// 筛选哪些列要被返回
std::vector<Value> vals;
vals.reserve(output_schema->GetColumnCount());
for (size_t i = 0; i < vals.capacity(); i++) {
vals.push_back(output_schema->GetColumn(i).GetExpr()->Evaluate(
&(*iter_), &(exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid())->schema_)));
}

// 迭代器+1
++iter_;

// 构造要返回的行
Tuple temp_tuple(vals, output_schema);

// 看看改行符不符合条件,符合则返回,不符合就继续找下一行
const AbstractExpression *predict = plan_->GetPredicate();
if (predict == nullptr || predict->Evaluate(&temp_tuple, output_schema).GetAs<bool>()) {
*tuple = temp_tuple;
*rid = original_rid;
return true;
}
return Next(tuple, rid);
}

Insert

这里需要实现插入部分。根据实验指导书,需要分别判断是直接插入(Insert into Values(…))还是从一个子查询里插入(Insert into … Select …)。先修改下头文件,增加几个常用的变量,后面可以减少代码长度。

1
2
3
4
5
6
7
8
// include\execution\executors\seq_scan_executor.h
private:
/** The insert plan node to be executed*/
const InsertPlanNode *plan_;
Catalog *catalog_;
TableHeap *table_heap_;
TableInfo *table_info_;
std::unique_ptr<AbstractExecutor> child_executor_;

然后是Init函数,这里直接初始化头文件中新增的几个变量即可:

1
2
3
4
5
6
// execution\insert_executor.cpp
void InsertExecutor::Init() {
catalog_ = exec_ctx_->GetCatalog();
table_info_ = catalog_->GetTable(plan_->TableOid());
table_heap_ = table_info_->table_.get();
}

根据说明,无论是直接插入还是从子查询中插入,我们都需要一行一行的插入数据并更新索引。我们把该过程提取出来,成为一个单独的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// execution\insert_executor.cpp
void InsertExecutor::InsertIntoTableWithIndex(Tuple cur_tuple) {
RID cur_rid;

// 调用table_heap,插入记录
if (!table_heap_->InsertTuple(cur_tuple, &cur_rid, exec_ctx_->GetTransaction())) {
throw Exception(ExceptionType::OUT_OF_MEMORY, "InsertExecutor:no enough space for this tuple.");
}

// 更新索引
for (const auto &index : catalog_->GetTableIndexes(table_info_->name_)) {
index->index_->InsertEntry(
cur_tuple->KeyFromTuple(table_info_->schema_, *index->index_->GetKeySchema(), index->index_->GetKeyAttrs()),
cur_rid, exec_ctx_->GetTransaction());
}
}

然后就可以实现Next函数了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// execution\insert_executor.cpp
bool InsertExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
// 先判断有没有子计划,如果没有的话直接插入即可
if (plan_->IsRawInsert()) {
for (const auto &row_value : plan_->RawValues()) {
InsertIntoTableWithIndex(Tuple(row_value, &(table_info_->schema_)));
}
return false;
}

// 有的话先执行子计划,仿照ExecutionEngine即可
std::vector<Tuple> child_tuples;
child_executor_->Init();
try {
Tuple tuple;
RID rid;
while (child_executor_->Next(&tuple, &rid)) {
child_tuples.push_back(tuple);
}
} catch (Exception &e) {
throw Exception(ExceptionType::UNKNOWN_TYPE, "InsertExecutor:child execute error.");
return false;
}

// 将子计划的结果插入
for (auto &child_tuple : child_tuples) {
InsertIntoTableWithIndex(child_tuple);
}
return false;
}

Update

接着来实现Update语句的功能。头文件中给了一个table_info_变量,故我们要在Init()中初始化他。Update需要我们完成的不多,故不需要再增加其他成员变量了。

1
2
3
4
5
// execution\update_executor.cpp
void UpdateExecutor::Init() {
table_info_ = exec_ctx_->GetCatalog()->GetTable(plan_->TableOid());
child_executor_->Init();
}

然后实现Next函数。这里已经给好了一个函数GenerateUpdatedTuple(const Tuple &src_tuple),用于更新一条记录。我们只需要遍历子查询的结果然后调用该函数获得新记录并使用TableHeap中的UpdateTuple更新记录即可。这里还要处理下子查询的错误,记得catch住之后往上抛。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// execution\update_executor.cpp
bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
Tuple old_tuple;
Tuple new_tuple;
RID tuple_rid;
while (true) {
// 执行子查询
try {
if (!child_executor_->Next(&old_tuple, &tuple_rid)) {
break;
}
} catch (Exception &e) { // 接住Exception接着往上抛
throw Exception(ExceptionType::UNKNOWN_TYPE, "UpdateExecutor:child execute error.");
return false;
}

// 更新记录
new_tuple = GenerateUpdatedTuple(old_tuple);
TableHeap *table_heap = table_info_->table_.get();
table_heap->UpdateTuple(new_tuple, tuple_rid, exec_ctx_->GetTransaction());
}
return false;
}

Delete

接着是Delete,这个也不难,根据指导书的要求实现即可。头文件中保存一个table_info_成员变量即可,Init()函数中要初始化他:

1
2
3
4
5
// execution\delete_executor.cpp
void DeleteExecutor::Init() {
table_info_ = exec_ctx_->GetCatalog()->GetTable(plan_->TableOid());
child_executor_->Init();
}

然后是Next函数,这里仍然是根据子查询器结果进行删除,同时要记得更新索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// execution\delete_executor.cpp
bool DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
Tuple del_tuple;
RID del_rid;
while (true) {
// 执行子查询器,catch异常然后接着抛
try {
if (!child_executor_->Next(&del_tuple, &del_rid)) {
break;
}
} catch (Exception &e) {
throw Exception(ExceptionType::UNKNOWN_TYPE, "DeleteExecutor:child execute error.");
return false;
}

// 根据子查询器的结果来调用TableHeap标记删除状态
TableHeap *table_heap = table_info_->table_.get();
table_heap->MarkDelete(del_rid, exec_ctx_->GetTransaction());

// 还要更新索引
for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) {
index_info->DeleteEntry(
del_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()), del_rid,
exec_ctx_->GetTransaction());
}
}
return false;
}

Nested Loop Join

这是一个循环Join的实现,即计算笛卡尔积并做筛选。这里改在Init中计算出Join的结果,然后Next读取结果。故需要修改类,使之存储运算结果。

1
2
3
4
5
6
7
8
9
// include\execution\executors\nested_loop_executor.h
private:
/** The NestedLoopJoin plan node to be executed. */
const NestedLoopJoinPlanNode *plan_;

std::unique_ptr<AbstractExecutor> left_executor_;
std::unique_ptr<AbstractExecutor> right_executor_;
std::vector<Tuple> result_;
uint32_t now_id_ = 0;

先实现Init函数,该函数先计算出两侧的结果,然后遍历结果,将匹配的行Join在一起,并将结果保存在result_中。注意,这里不能先分别读取两个表并将结果保存在vector中,必须在内层循环重复读取一遍。不然IO测试过不去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// execution\nested_loop_executor.cpp
void NestedLoopJoinExecutor::Init() {
// 定义些变量
Tuple left_tuple;
RID left_rid;
Tuple right_tuple;
RID right_rid;

// 分别循环执行查询
left_executor_->Init();
while (left_executor_->Next(&left_tuple, &left_rid)) {
right_executor_->Init();
while (right_executor_->Next(&right_tuple, &right_rid)) {
// 计算连接条件
if (plan_->Predicate() == nullptr || plan_->Predicate()
->EvaluateJoin(&left_tuple, left_executor_->GetOutputSchema(),
&right_tuple, right_executor_->GetOutputSchema())
.GetAs<bool>()) {
// 计算要输出的列
std::vector<Value> output;
for (const auto &col : GetOutputSchema()->GetColumns()) {
output.push_back(col.GetExpr()->EvaluateJoin(&left_tuple, left_executor_->GetOutputSchema(), &right_tuple,
right_executor_->GetOutputSchema()));
}
result_.emplace_back(Tuple(output, GetOutputSchema()));
}
}
}
}

而后Next函数则简单许多,每调用一次Next返回一个结果即可。

1
2
3
4
5
6
7
8
9
// execution\nested_loop_executor.cpp
bool NestedLoopJoinExecutor::Next(Tuple *tuple, RID *rid) {
if (now_id_ < result_.size()) {
*tuple = result_[now_id_];
now_id_++;
return true;
}
return false;
}

Hash Join

这里有点问题,指导书里说HashJoinPlanNode有两个函数:GetLeftJoinKey()和GetRightJoinKey(),但是我没找到,所以需要我们自己实现哈希方法。参考Distinct,还是需要修改头文件。与Distinct不同,这里哈希只需要散列一列即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// include\execution\executors\hash_join_executor.h
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/util/hash_util.h"
#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
#include "execution/expressions/abstract_expression.h"
#include "execution/plans/hash_join_plan.h"
#include "storage/table/tuple.h"

namespace bustub {
struct HashJoinKey {
/** The group-by values */
Value column_value_;

/**
* Compares two aggregate keys for equality.
* @param other the other aggregate key to be compared with
* @return `true` if both aggregate keys have equivalent group-by expressions, `false` otherwise
*/
bool operator==(const HashJoinKey &other) const {
return column_value_.CompareEquals(other.column_value_) == CmpBool::CmpTrue;
}
};
} // namespace bustub

namespace std {
/** Implements std::hash on AggregateKey */
template <>
struct hash<bustub::HashJoinKey> {
std::size_t operator()(const bustub::HashJoinKey &agg_key) const {
size_t curr_hash = 0;
if (!agg_key.column_value_.IsNull()) {
curr_hash = bustub::HashUtil::CombineHashes(curr_hash, bustub::HashUtil::HashValue(&agg_key.column_value_));
}
return curr_hash;
}
};
} // namespace std

根据提示,我们决定还是在Init中求出所有的结果,在Next中直接查看即可。所以我们还需要在类的成员中保存左右两个子执行器、散列结果、查询结果以及查询遍历的位置。修改头文件内私有变量如下:

1
2
3
4
5
6
7
8
9
// include\execution\executors\hash_join_executor.h
private:
/** The NestedLoopJoin plan node to be executed. */
const HashJoinPlanNode *plan_;
std::unique_ptr<AbstractExecutor> left_child_executor_;
std::unique_ptr<AbstractExecutor> right_child_executor_;
std::unordered_map<HashJoinKey, std::vector<Tuple>> map_;
std::vector<Tuple> result_;
uint32_t now_id_ = 0;

接着实现Init函数,这部分需要先将左表的内容散列到哈希表里,然后再遍历右表,构造出Join结果。

需要注意的是,对于两张表而言,符合同一哈希结果的行可能有多个。故我选择在哈希表中保存由Tuple构成的vector,来存储左表中同一哈希结果的多行数据。右表则可以在遍历过程中解决,不需要重复存储。

总体思路是先将左表中的全部列根据join条件散列到一张哈希表中,然后遍历右表,右表中的每行数据都要根据join条件的哈希结果在之前得到的哈希表中找到所有行,并一一组合。这样得到的结果才能是笛卡尔积。Init函数的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// include\execution\hash_join_executor.cpp
void HashJoinExecutor::Init() {
left_child_executor_->Init();
right_child_executor_->Init();
// 通过左侧查询构造哈希
Tuple left_tuple;
RID left_rid;
while (left_child_executor_->Next(&left_tuple, &left_rid)) {
// 构造Key
HashJoinKey dis_key;
dis_key.column_value_ =
plan_->LeftJoinKeyExpression()->Evaluate(&left_tuple, left_child_executor_->GetOutputSchema());
// 重复的数据要额外保存
if (map_.count(dis_key) != 0) {
map_[dis_key].emplace_back(left_tuple);
} else {
map_[dis_key] = std::vector{left_tuple};
}
}
// 遍历右侧查询,得到查询结果
Tuple right_tuple;
RID right_rid;
while (right_child_executor_->Next(&right_tuple, &right_rid)) {
// 构造Key
HashJoinKey dis_key;
dis_key.column_value_ =
plan_->RightJoinKeyExpression()->Evaluate(&right_tuple, right_child_executor_->GetOutputSchema());
// 遍历每一个对应的左侧查询
if (map_.count(dis_key) != 0) {
for (auto &left_tuple_new : map_.find(dis_key)->second) {
std::vector<Value> output;
for (const auto &col : GetOutputSchema()->GetColumns()) {
output.push_back(col.GetExpr()->EvaluateJoin(&left_tuple_new, left_child_executor_->GetOutputSchema(),
&right_tuple, right_child_executor_->GetOutputSchema()));
}
result_.emplace_back(Tuple(output, GetOutputSchema()));
}
}
}
}

最后Next函数的实现与Distinct一样,代码如下:

1
2
3
4
5
6
7
8
9
10
// include\execution\hash_join_executor.cpp
bool HashJoinExecutor::Next(Tuple *tuple, RID *rid) {
if (now_id_ < result_.size()) {
*tuple = result_[now_id_];
*rid = tuple->GetRid();
now_id_++;
return true;
}
return false;
}

Aggregation

聚合函数的实现仍然是通过哈希表的,比较复杂。不过这里基本已经给好了,我们只需要调用即可。在Init函数中,我们只需将子执行器的数据插入到哈希表中,然后初始化迭代器即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// execution\aggregation_executor.cpp
void AggregationExecutor::Init() {
child_->Init();
Tuple tuple;
RID rid;
try {
// 将子查询结果插入到哈希表中
while (child_->Next(&tuple, &rid)) {
aht_.InsertCombine(MakeAggregateKey(&tuple), MakeAggregateValue(&tuple));
}
} catch (Exception &e) {
throw Exception(ExceptionType::UNKNOWN_TYPE, "AggregationExecutor:child execute error.");
}
aht_iterator_ = aht_.Begin();
}

在Next函数中,则对哈希表进行遍历。遍历的同时要判定Having条件,其余与之前的类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// execution\aggregation_executor.cpp
bool AggregationExecutor::Next(Tuple *tuple, RID *rid) {
if (aht_iterator_ == aht_.End()) {
return false;
}
const AggregateKey &agg_key = aht_iterator_.Key();
const AggregateValue &agg_value = aht_iterator_.Val();
++aht_iterator_;

// 判断Having条件,符合返回,不符合则继续查找
if (plan_->GetHaving() == nullptr ||
plan_->GetHaving()->EvaluateAggregate(agg_key.group_bys_, agg_value.aggregates_).GetAs<bool>()) {
std::vector<Value> ret;
for (const auto &col : plan_->OutputSchema()->GetColumns()) {
ret.push_back(col.GetExpr()->EvaluateAggregate(agg_key.group_bys_, agg_value.aggregates_));
}
*tuple = Tuple(ret, plan_->OutputSchema());
return true;
}
return Next(tuple, rid);
}

Limit

这个也比较简单,成员变量中保存一个计数器,然后遍历即可。全部代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// execution\limit_executor.cpp
LimitExecutor::LimitExecutor(ExecutorContext *exec_ctx, const LimitPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor)
: AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}

void LimitExecutor::Init() {
child_executor_->Init();
output_num_ = 0;
}

bool LimitExecutor::Next(Tuple *tuple, RID *rid) {
Tuple child_tuple;
RID child_rid;
while (true) {
try {
if (!child_executor_->Next(&child_tuple, &child_rid)) {
break;
}
} catch (Exception &e) {
throw Exception(ExceptionType::UNKNOWN_TYPE, "LimitExecutor:child execute error.");
return false;
}
if (output_num_ < plan_->GetLimit()) {
output_num_++;
*tuple = child_tuple;
*rid = child_rid;
return true;
}
return false;
}
return false;
}

Distinct

该关键字主要用于去重,可以使用哈希表实现。这里让我们参考AggregationExecutor类,找到了该类的Plan头文件,大概知道我们要为所有行定义出一个Key,并重载Hash函数用于哈希表。

由于不能修改Plan Node文件,故我们只能将新的DistinctKey定义在执行器的头文件中。这里要注意,哈希函数的重载是在std命名空间中完成的,而DistinctKey和后续头文件的修改要在bustub中完成。为保证引用次序,需要先开一个namespace bustub定义DistinctKey,然后再在std namespace中定义Hash函数的重载。最后是该头文件原本的内容。修改后的定义和头文件引用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// include\execution\executors\distinct_executor.h
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/util/hash_util.h"
#include "execution/executors/abstract_executor.h"
#include "execution/plans/distinct_plan.h"
namespace bustub {
struct DistinctKey {
/** The group-by values */
std::vector<Value> distincts_;

/**
* Compares two aggregate keys for equality.
* @param other the other aggregate key to be compared with
* @return `true` if both aggregate keys have equivalent group-by expressions, `false` otherwise
*/
bool operator==(const DistinctKey &other) const {
for (uint32_t i = 0; i < other.distincts_.size(); i++) {
if (distincts_[i].CompareEquals(other.distincts_[i]) != CmpBool::CmpTrue) {
return false;
}
}
return true;
}
};
} // namespace bustub

namespace std {
/** Implements std::hash on AggregateKey */
template <>
struct hash<bustub::DistinctKey> {
std::size_t operator()(const bustub::DistinctKey &agg_key) const {
size_t curr_hash = 0;
for (const auto &key : agg_key.distincts_) {
if (!key.IsNull()) {
curr_hash = bustub::HashUtil::CombineHashes(curr_hash, bustub::HashUtil::HashValue(&key));
}
}
return curr_hash;
}
};
} // namespace std

还要在头文件中定义哈希表和迭代器。哈希表保存了Key和Tuple组成的pair。

1
2
3
4
5
6
7
8
// include\execution\executors\distinct_executor.h
private:
/** The distinct plan node to be executed */
const DistinctPlanNode *plan_;
/** The child executor from which tuples are obtained */
std::unique_ptr<AbstractExecutor> child_executor_;
std::unordered_map<DistinctKey, Tuple> map_;
std::unordered_map<DistinctKey, Tuple>::iterator iter_;

然后是Init函数,在Init中需要先把哈希表构造出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// include\execution\distinct_executor.cpp
void DistinctExecutor::Init() {
child_executor_->Init();
Tuple child_tuple;
RID child_rid;
while (child_executor_->Next(&child_tuple, &child_rid)) {
// 构造Key
DistinctKey dis_key;
dis_key.distincts_.reserve(plan_->OutputSchema()->GetColumnCount());
for (uint32_t idx = 0; idx < dis_key.distincts_.capacity(); idx++) {
dis_key.distincts_.push_back(child_tuple.GetValue(plan_->OutputSchema(), idx));
}

// 根据Key进行插入
if (map_.count(dis_key) == 0) {
map_.insert({dis_key, child_tuple});
}
}
iter_ = map_.begin();
}

最后实现Next函数,使用迭代器遍历哈希表即可:

1
2
3
4
5
6
7
8
9
10
// include\execution\distinct_executor.cpp
bool DistinctExecutor::Next(Tuple *tuple, RID *rid) {
if (iter_ == map_.end()) {
return false;
}
*tuple = iter_->second;
*rid = iter_->second.GetRid();
iter_++;
return true;
}

ExecutionEngine

还要补全这里的一个函数,不过并不难,就是catch住异常然后接着往上抛即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// include\execution\execution_engine.cpp
try {
Tuple tuple;
RID rid;
while (executor->Next(&tuple, &rid)) {
if (result_set != nullptr) {
result_set->push_back(tuple);
}
}
} catch (Exception &e) {
// TODO(student): handle exceptions
throw Exception(ExceptionType::UNKNOWN_TYPE, "InsertExecutor:child execute error.");
return false;
}

总结

这部分是实现了一个执行器,能实现一些常见的SQL语句的功能。我们并没有SQL解析器,因此所有的操作都是在测试文件中手工编写的。相比上一个Lab,这个简单了许多,而且相当部分的代码不需要自己完成。我觉得这个实验的目的是通过执行器,了解一下SQL语句的执行方式和执行顺序。

参考

https://zhuanlan.zhihu.com/p/355478771
https://www.cnblogs.com/sun-lingyu/p/15316626.html