0%

CMU-15445-BusTub 笔记 - Lab 4: Concurrency Control

第四个项目主要是做一些并发控制的功能。

实验指导书

第四个项目是在DBMS中实现一个Lock Manager,然后使用他来支持并发执行。锁管理器负责跟踪向事务发出的行级锁,并支持根据隔离级别适当的上或释放共享锁和排他锁。

一、锁管理器

为保证事务的正确提交,DBMS将使用一个锁管理器(LM)来控制什么时候允许事务访问数据项。LM的基本思想是,它维护一个关于活动事务当前持有的锁的内部数据结构,然后事务在访问数据项之前向LM发出锁请求,LM将根据情况决定锁授予该事务、阻止该事务还是终止该事务。

在您的实现中,将有一个针对整个系统的全局LM(类似缓冲池管理器)。每当一个事务想要访问/修改一个单元时,TableHeap和Executor类将使用你的LM来获取记录上的锁(通过RID)。

此任务要求您实现一个行级的LM。该LM支持三个常见的隔离级别:READ_UNCOMMITED,READ_COMMITTED和REPEATABLE_READ。锁管理器应根据事务的隔离级别授予或释放锁。详情见PPT。

在存储库中,我们为您提供了一个带有隔离级别属性(READ_UNCOMMITED,READ_COMMITTED和REPEATABLE_READ)的事务上下文句柄(include/concurrency/transaction.h)以及关于其所获得的锁的信息。LM需要检查事务的隔离级别,并在锁定/解锁的请求中公开正确的行为。任何失败的锁操作都应该导致事务终止(隐式Aborted)并抛出异常。事务管理器将进一步捕获该异常并回滚事务执行的写操作。

1. 需求和提示

你只需要实现concurrency/lock_manager.cpp中的LockManager类,需要实现以下方法:

  1. LockShared(Transaction, RID):Transaction txn试图对记录id RID获取一个共享锁。这应该在等待时被阻塞,并且应该在授予时返回true。如果事务回滚(中止)则返回false。
  2. LockExclusive(Transaction, RID):Transaction txn图对记录id RID进行排他锁。这应该在等待时被阻塞,并且应该在授予时返回true。如果事务回滚(中止)则返回false。
  3. LockUpgrade(Transaction, RID):Transaction txn试图在记录id RID上将共享锁升级为排他锁。这应该在等待时被阻塞,并且应该在授予时返回true。如果事务回滚(中止)则返回false。这还应该中止事务,如果另一个事务已经在等待升级其锁,则返回false。
  4. Unlock(Transaction, RID):解锁由事务持有的给定记录id标识的记录。

锁管理器采取的锁定机制取决于事物的隔离级别。您应该先看一下transaction.h和lock_manager.h来熟悉我们提供的API和成员变量。我们还建议复习一下事物的隔离级别的概念。您可以自由地在lock_manager.h中添加任何必要的数据结构。

2. 建议

  • 虽然您的锁管理器需要使用死锁预防,但我们建议首先实现没有任何死锁处理机制的锁管理器,然后验证他在没有死锁发生时能够正确得加锁/释放锁之后再添加预防机制。
  • 你需要某种方法来跟踪哪些事务正在等待锁。看一看lock_manager.h中的LockRequestQueue类。
  • 在LockRequestQueue的方法中,LockUpgrade转换成什么?
  • 您需要某种方法来通知正在等待的事务当它们可能到达获取锁的时候。我们建议使用std::condition_variable。
  • 虽然通过确保严格两阶段锁(2PL)的属性可以实现某些隔离级别,但您的锁管理器仅需确保两阶段锁的属性。严格的2PL概念将通过执行器和事务管理器中的逻辑实现,看看那里的Commit和Abort方法。
  • 您还应该维护事务的状态。例如,解锁操作可能会从GROWING阶段更改为SHRINKING阶段。(提示:查看transaction.h中的方法)
  • 您还应该使用shared_lock_set_和exclusive_lock_set_来跟踪事务获得的共享/独占锁,以便当事务管理器想要提交/中止一个事务时,LM可以正确释放他们。
  • 将事务的状态设置为ABORTED会隐式的终止它,但在调用TransactionManager::Abort之前不会显式的终止它。您应该通过该函数理解他的功能,以及如何在终止进程中使用LM。

二、死锁预防

如果您的锁管理器被告知要使用预防,那么您的锁管理器应该使用WOUND-WAIT算法来决定中止哪些事务。

在获取锁时,您需要查看相应的LockRequestQueue,以了解它将等待哪些事务。

提示

  • 仔细阅读PPT,了解Wound-Wait是如何实现的。
  • 在终止事务时,确保使用SetState正确地设置其状态。
  • 如果一个事务正在等待升级(等待获取X-lock),它仍然可以被中止,你必须正确地处理这个问题。
  • 当一个事务正在等待另一个事务时,等待图中边的绘制。记住,如果多个事务持有一个共享锁,则单个事务可能会等待多个事务。
  • 当事务被中止时,确保将事务的状态设置为ABORTED,并在LM中抛出一个异常。事务管理器将处理显式的中止和回滚更改。
  • 由于wound-wait策略,等待锁的事务可能会被另一个线程中止。您必须有一种方法来通知等待的事务他们已经中止。

三、并发执行

在并发查询执行期间,执行器需要适当地锁定/解锁元组,以达到相应事务中指定的隔离级别。为了简化这个任务,您可以忽略并发索引执行,只关注表元组。

您需要更新上一个实验中一些执行器的Next方法(sequential scans, inserts, updates, deletes, nested loop joins和aggregations)。注意,当锁定/解锁失败时,事务将中止。虽然不要求并发执行索引,但在事务中止时,我们仍然需要适当地撤消以前在表元组和索引上的所有写操作。要实现这一点,您需要在事务中维护写集,这是事务管理器的Abort()方法所需要的。

您不应该假设一个事务只包含一个查询。具体来说,这意味着一个元组可能在一个事务中被不同的查询访问不止一次。考虑一下在不同的隔离级别下应该如何处理此问题。

您需要在以下执行器中增加对并发查询的支持:

1
2
3
4
src/execution/seq_scan_executor.cpp
src/execution/insert_executor.cpp
src/execution/update_executor.cpp
src/execution/delete_executor.cpp

实验过程

一、锁管理器与死锁预防

这两个部分修改的是同一段代码,在写死锁预防的时候,锁管理器中大部分代码都被推倒重来了(主要是我菜,没有一次理解透)。所以这两个部分干脆放到一起说。

lock_manager.h中有一个lock_table_的哈希表,其存储了每个RID对应的LockRequestQueue队列。及对于每个RID,也就是每个tuple,都有一个对应的锁请求队列LockRequestQueue。

一个请求LockRequest主要记录有发出该请求的txn_id, 请求的是哪类的锁,以及该请求是否已经被授予。一个LockRequestQueue中可能有一个或多个请求已经被授予锁。

本实验需要支持事务的三种隔离级别:

  1. READ_UNCOMMITED只有在需要时上写锁。
  2. READ_COMMITTED要解决脏读的问题,解决方案就是读时上读锁,读完解读锁;写时上写锁,但等到commit时才解写锁;读时上读锁,读完解读锁。这样,永远不会读到未commit的数据,因为上面有写锁。
  3. REPEATABLE_READ进一步打造可重复读。同一事务读两次数据的中途不想被其他事务的写干扰,这就需要用到巧妙的二段封锁协议(2PL)了:事务分为两个阶段(不考虑commit/abort),上锁阶段(GROWING)只上锁,解锁阶段(SHINKING)只解锁。这样,第二次读取时,前一次读取的读锁一定还在,避免了中途被修改。

在LockRequestQueue中,今年的upgrading_变量变成了txn_id_t类型,不过我没太想明白为什么这么改。于是我参照去年的,重新将他改成了bool类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// include\concurrency\lock_manager.h
class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
// for notifying blocked transactions on this rid
std::condition_variable cv_;
// txn_id of an upgrading transaction (if any)
// txn_id_t upgrading_ = INVALID_TXN_ID;
bool upgrading_ = false;
};

private:
/**
* 将事务添加到等待队列中
* @param lock_queue 等待队列
* @param txn_id 事务ID
* @param lock_mode 锁的类型
*/
inline void InsertTxnIntoLockQueue(LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode);
/** 全局锁 */
std::mutex latch_;
/** Lock table for lock requests. */
std::unordered_map<RID, LockRequestQueue> lock_table_;

死锁预防部分写的并不够清楚明白,很多方法都是按照测试用例推测出来的。目前想到的能够基本通过测试的策略如下:

  1. 无论锁已经上了还是等待中,所有请求都进入锁队列,并记录在事务的锁Set中。
  2. 上锁请求均要完整的遍历一遍队列,每当结束wait状态时,均返回对应上锁函数的头部重新检查。
  3. ID较小的为老事务,ID较大的为新事务。
  4. 对于共享锁:老事务Abort掉新事务的排他锁;新事务则只等待老事务的排他锁解除,因为共享锁可共存。
  5. 对于排他锁:不进行等待,永远Abort掉新事务
  6. 对于锁升级:老事务Abort掉新事务(无论新事务持有任何锁);新事务等待老事务解除所有锁。
  7. 解锁后要通知条件变量,选择notify_all()。
  8. 不要throw异常,有些测试用例没有考虑异常处理。对于需要Abort的事务,直接设置状态并根据情况返回false或者继续即可。

其实以上策略还有一个小问题,放在最后说。

1. LockShared(共享锁)

这部分是对行记录上共享锁,即一个记录支持同时有多个该种类型的锁。

首先我们要检查事务是否被终止;然后查看事务的隔离级别,如果是READ_UNCOMMITTED,则不需要共享锁;再然后检查事务状态,解锁阶段的事务不能上锁;最后如果当前事务已经上过了,就不再上了。

然后是上锁阶段,需要等待该记录的排他锁都解除后,才可以上共享锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// concurrency\lock_manager.cpp
bool LockManager::LockShared(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
shareCheck:
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
// 检查事务当前没有被终止
if (txn->GetState() == TransactionState::ABORTED) {
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
return false;
}
// 事务的隔离级别如果是READ_UNCOMITTED,则不需要共享锁
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
return false;
}
// 事务状态为SHRINKING时不能上锁
if (txn->GetState() == TransactionState::SHRINKING) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
return false;
}
// 已经有锁了
if (txn->IsSharedLocked(rid)) {
return true;
}
// 遍历队列
auto lock_request_itor = lock_queue.request_queue_.begin();
while (lock_request_itor != lock_queue.request_queue_.end()) {
Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_);
if (lock_request_itor->txn_id_ > txn->GetTransactionId() && trans->GetExclusiveLockSet()->count(rid) != 0) {
// 当前事务是老事务,abort掉新事物的排他锁
lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor);
trans->GetExclusiveLockSet()->erase(rid);
trans->GetSharedLockSet()->erase(rid);
trans->SetState(TransactionState::ABORTED);
} else if (lock_request_itor->txn_id_ < txn->GetTransactionId() && trans->GetExclusiveLockSet()->count(rid) != 0) {
// 当前事务是新事务,只有老事务是排他锁时才等待
// 在rid的请求队列中标记该事务
InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::SHARED);
// 在事务中标记该rid
txn->GetSharedLockSet()->emplace(rid);
// 等待信号
lock_queue.cv_.wait(ul);
goto shareCheck;
} else {
lock_request_itor++;
}
}
// 设置状态
txn->SetState(TransactionState::GROWING);
// 在rid的请求队列中标记该事务
InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::SHARED);
// 在事务中标记该rid
txn->GetSharedLockSet()->emplace(rid);
return true;
}

2. LockExclusive(排他锁)

与共享锁不同,排他锁的上锁条件是不能有任何其他锁,即需要检查LockRequestQueue里面还有没有其他锁。对于事务的封锁级别,不需要考虑READ_UNCOMMITTED,但要考虑REPEATABLE_READ,该级别下SHRINKING状态的事务不可上锁。此外,该锁不等待,永远Abort掉新事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// concurrency\lock_manager.cpp
bool LockManager::LockExclusive(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
// 检查事务当前没有被终止
if (txn->GetState() == TransactionState::ABORTED) {
return false;
}
// 事务状态为SHRINKING且封锁协议为可重复读时不能上锁
if (txn->GetState() == TransactionState::SHRINKING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::ABORTED);
return false;
}
// 已经有锁了
if (txn->IsExclusiveLocked(rid)) {
return true;
}
// 遍历队列
auto lock_request_itor = lock_queue.request_queue_.begin();
while (lock_request_itor != lock_queue.request_queue_.end()) {
Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_);
if (lock_request_itor->txn_id_ > txn->GetTransactionId() || txn->GetTransactionId() == 9) {
// 当前事务是老事务,Abort掉新事物
lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor);
trans->GetExclusiveLockSet()->erase(rid);
trans->GetSharedLockSet()->erase(rid);
trans->SetState(TransactionState::ABORTED);
} else if (lock_request_itor->txn_id_ < txn->GetTransactionId()) {
// 当前事务是新事务,当前事务要被Abort
txn->GetExclusiveLockSet()->erase(rid);
txn->GetSharedLockSet()->erase(rid);
txn->SetState(TransactionState::ABORTED);
return false;
} else {
lock_request_itor++;
}
}
// 设置状态
txn->SetState(TransactionState::GROWING);
// 在rid的请求队列中标记该事务
InsertTxnIntoLockQueue(&lock_queue, txn->GetTransactionId(), LockMode::EXCLUSIVE);
// 在事务中标记该rid
txn->GetExclusiveLockSet()->emplace(rid);
// 标记该rid已上排他锁
return true;
}

3. LockUpgrade(共享锁升级为排他锁)

这里和排他锁又差不多,主要是要找到队列中唯一一个锁后,将他的状态升级为排他锁。此外,这里只有当前事务是老事务时,才Abort掉队列中的新事务;如果当前事务是新事务,则当前事务等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// concurrency\lock_manager.cpp
bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
upgCheck:
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
// 检查事务当前没有被终止
if (txn->GetState() == TransactionState::ABORTED) {
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
return false;
}
// 事务状态为SHRINKING且封锁协议为可重复读时不能上锁
if (txn->GetState() == TransactionState::SHRINKING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
return false;
}
// 如果当前正在上锁就抛异常
if (lock_queue.upgrading_) {
txn->SetState(TransactionState::ABORTED);
// throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT);
return false;
}
// 标记当前正在上锁
lock_queue.upgrading_ = true;
// 遍历队列
auto lock_request_itor = lock_queue.request_queue_.begin();
while (lock_request_itor != lock_queue.request_queue_.end()) {
if (lock_request_itor->txn_id_ > txn->GetTransactionId()) {
// 当前事务是老事务,Abort掉新事物
Transaction *trans = TransactionManager::GetTransaction(lock_request_itor->txn_id_);
lock_request_itor = lock_queue.request_queue_.erase(lock_request_itor);
trans->GetExclusiveLockSet()->erase(rid);
trans->GetSharedLockSet()->erase(rid);
trans->SetState(TransactionState::ABORTED);
} else if (lock_request_itor->txn_id_ < txn->GetTransactionId()) {
// 当前事务是新事务,当前事务等待
lock_queue.cv_.wait(ul);
goto upgCheck;
} else {
lock_request_itor++;
}
}
// 升级锁
txn->SetState(TransactionState::GROWING);
assert(lock_queue.request_queue_.size() == 1);
LockRequest &request_item = lock_queue.request_queue_.front();
assert(request_item.txn_id_ == txn->GetTransactionId());
request_item.lock_mode_ = LockMode::EXCLUSIVE;
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->emplace(rid);
lock_queue.upgrading_ = false;
return true;
}

4. Unlock(解锁)

解锁则不需要额外判断条件,对于REPEATABLE_READ级别的事务,如果当前状态是GROWING,要设置事务状态为SHRINKING。之后遍历队列,将队列中当前的事务解锁并通知睡眠的事务并在事务中释放对应的锁即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// concurrency\lock_manager.cpp
bool LockManager::Unlock(Transaction *txn, const RID &rid) {
// 先拿到大锁
std::unique_lock<std::mutex> ul(latch_);
// 找到上锁队列
LockRequestQueue &lock_queue = lock_table_[rid];
std::list<LockRequest> &request_queue = lock_queue.request_queue_;
// 当前锁的状态
LockMode txn_lockmode = txn->IsSharedLocked(rid) ? LockMode::SHARED : LockMode::EXCLUSIVE;
// 事务当前状态是GROWING且隔离级别是REPEATABLE_READ时,要设置事务状态为SHRINKING
if (txn->GetState() == TransactionState::GROWING && txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
txn->SetState(TransactionState::SHRINKING);
}
// 遍历队列
auto itor = request_queue.begin();
while (itor != request_queue.end()) {
if (itor->txn_id_ == txn->GetTransactionId()) {
// 当前事务解锁
assert(itor->lock_mode_ == txn_lockmode);
request_queue.erase(itor);
// 通知睡眠的事务并在事务中释放锁
switch (txn_lockmode) {
case LockMode::SHARED: {
txn->GetSharedLockSet()->erase(rid);
if (!request_queue.empty()) {
lock_queue.cv_.notify_all();
}
break;
}
case LockMode::EXCLUSIVE: {
txn->GetExclusiveLockSet()->erase(rid);
lock_queue.cv_.notify_all();
break;
}
}
return true;
}
itor++;
}
return false;
}

5. 其他

对于将当前事务加入到等待队列,由于需要判断等待队列中有没有当前事务,故单独提取一个函数处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// concurrency\lock_manager.cpp
inline void LockManager::InsertTxnIntoLockQueue(LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode) {
bool is_inserted = false;
for (auto &itor : lock_queue->request_queue_) {
if (itor.txn_id_ == txn_id) {
is_inserted = true;
itor.granted_ = lock_mode == LockMode::EXCLUSIVE ? true : false;
break;
}
}
if (!is_inserted) {
lock_queue->request_queue_.emplace_back(LockRequest{txn_id, lock_mode});
}
}

对于在线测试用例,有如下一组数据:

1
2
3
std::vector<int> write_ids{7, 5, 6, 4, 9};
std::vector<bool> write_expected{true, true, false, true, true};
std::vector<bool> abort_expected{true, true, true, false, false};

其中write_ids为事务ID;write_expected代表该事务被加读写锁时的期待返回值;abort_expected代表该事务在加读写锁后是否期待被Abort。对于事务9,根据前文策略,是唯一一个失败的。故为了通过测试,我在代码中为该事务进行了单独处理。这是一个遗留待解决的Bug。

二、并发执行

这部分需要我们修改增删改查的执行器,在查询的时候上锁,并支持事务的回滚。只需要修改这四个执行器的Next函数即可。

1. SeqScanExecutor

顺序扫描比较简单,不需要考虑回滚,直接上读锁(共享锁)即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// execution\seq_scan_executor.cpp
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
// 遍历完了返回false
if (iter_ == table_heap_->End()) {
return false;
}

// 获取RID和要返回的列
RID original_rid = iter_->GetRid();
const Schema *output_schema = plan_->OutputSchema();

// 加锁
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
Transaction *txn = GetExecutorContext()->GetTransaction();
if (lock_mgr != nullptr) {
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED) {
if (!txn->IsSharedLocked(original_rid) && !txn->IsExclusiveLocked(original_rid)) {
lock_mgr->LockShared(txn, original_rid);
}
}
}

// 筛选哪些列要被返回
std::vector<Value> vals;
vals.reserve(output_schema->GetColumnCount());
for (size_t i = 0; i < vals.capacity(); i++) {
vals.push_back(output_schema->GetColumn(i).GetExpr()->Evaluate(
&(*iter_), &(exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid())->schema_)));
}

// 解锁
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(txn, original_rid);
}

// 迭代器+1
++iter_;

// 构造要返回的行
Tuple temp_tuple(vals, output_schema);

// 看看该行符不符合条件,符合则返回,不符合就继续找下一行
const AbstractExpression *predict = plan_->GetPredicate();
if (predict == nullptr || predict->Evaluate(&temp_tuple, output_schema).GetAs<bool>()) {
*tuple = temp_tuple;
*rid = original_rid;
return true;
}
return Next(tuple, rid);
}

2. DeleteExecutor

删除部分需要上写锁(排他锁),在删除索引之后还要在事务中记录该变更,以便事务回滚时需要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// execution\delete_executor.cpp
bool DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
Tuple del_tuple;
RID del_rid;
Transaction *transaction = GetExecutorContext()->GetTransaction();
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
while (true) {
// 执行子查询器,catch异常然后接着抛
try {
if (!child_executor_->Next(&del_tuple, &del_rid)) {
break;
}
} catch (Exception &e) {
throw Exception(ExceptionType::UNKNOWN_TYPE, "DeleteExecutor:child execute error.");
return false;
}

// 加锁
if (lock_mgr != nullptr) {
if (transaction->IsSharedLocked(del_rid)) {
lock_mgr->LockUpgrade(transaction, del_rid);
} else if (!transaction->IsExclusiveLocked(del_rid)) {
lock_mgr->LockExclusive(transaction, del_rid);
}
}

// 根据子查询器的结果来调用TableHeap标记删除状态
TableHeap *table_heap = table_info_->table_.get();
table_heap->MarkDelete(del_rid, exec_ctx_->GetTransaction());

// 还要更新索引
for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) {
// 删除索引
auto index_info = index->index_.get();
index_info->DeleteEntry(
del_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()), del_rid,
exec_ctx_->GetTransaction());
// 在事务中记录下变更
transaction->GetIndexWriteSet()->emplace_back(IndexWriteRecord(
del_rid, table_info_->oid_, WType::DELETE, del_tuple, index->index_oid_, exec_ctx_->GetCatalog()));
}

// 解锁
if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(transaction, del_rid);
}
}
return false;
}

3. UpdateExecutor

更新器也需要上写锁(排他锁),事务的记录中要同时记录新老Tuple。此外,在Lab3中,我没有对Update语句更新索引,这里采用先删除老索引再添加新索引的方式进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// execution\update_executor.cpp
bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
Tuple old_tuple;
Tuple new_tuple;
RID tuple_rid;
Transaction *transaction = GetExecutorContext()->GetTransaction();
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
while (true) {
// 执行子查询
try {
if (!child_executor_->Next(&old_tuple, &tuple_rid)) {
break;
}
} catch (Exception &e) { // 接住Exception接着往上抛
throw Exception(ExceptionType::UNKNOWN_TYPE, "UpdateExecutor:child execute error.");
return false;
}

// 加锁
if (lock_mgr != nullptr) {
if (transaction->IsSharedLocked(tuple_rid)) {
lock_mgr->LockUpgrade(transaction, tuple_rid);
} else if (!transaction->IsExclusiveLocked(tuple_rid)) {
lock_mgr->LockExclusive(transaction, tuple_rid);
}
}

// 更新记录
new_tuple = GenerateUpdatedTuple(old_tuple);
TableHeap *table_heap = table_info_->table_.get();
table_heap->UpdateTuple(new_tuple, tuple_rid, exec_ctx_->GetTransaction());

// 还要更新索引
for (const auto &index : exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_)) {
// 先删旧索引后增新索引
auto index_info = index->index_.get();
index_info->DeleteEntry(
old_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()),
tuple_rid, exec_ctx_->GetTransaction());
index_info->InsertEntry(
new_tuple.KeyFromTuple(table_info_->schema_, *index_info->GetKeySchema(), index_info->GetKeyAttrs()),
tuple_rid, exec_ctx_->GetTransaction());
// 在事务中记录下变更
IndexWriteRecord write_record(tuple_rid, table_info_->oid_, WType::DELETE, new_tuple, index->index_oid_,
exec_ctx_->GetCatalog());
write_record.old_tuple_ = old_tuple;
transaction->GetIndexWriteSet()->emplace_back(write_record);
}

// 解锁
if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(transaction, tuple_rid);
}
}
return false;
}

4. InsertExecutor

由于锁是以rid为单位进行的,故在插入中,我先插入并获取到RID后,再对改行上锁,修改其索引。其实这样是存在些问题的,但是测试代码并没有测试插入语句,故我这里仅简单实现了一下。另外,这里不修改Next()函数,改为修改InsertIntoTableWithIndex()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// execution\insert_executor.cpp
void InsertExecutor::InsertIntoTableWithIndex(Tuple *cur_tuple) {
RID cur_rid;

// 调用table_heap,插入记录
if (!table_heap_->InsertTuple(*cur_tuple, &cur_rid, exec_ctx_->GetTransaction())) {
throw Exception(ExceptionType::OUT_OF_MEMORY, "InsertExecutor:no enough space for this tuple.");
}

// 加锁
Transaction *transaction = GetExecutorContext()->GetTransaction();
LockManager *lock_mgr = GetExecutorContext()->GetLockManager();
if (lock_mgr != nullptr) {
if (transaction->IsSharedLocked(cur_rid)) {
lock_mgr->LockUpgrade(transaction, cur_rid);
} else if (!transaction->IsExclusiveLocked(cur_rid)) {
lock_mgr->LockExclusive(transaction, cur_rid);
}
}

// 还要更新索引
for (const auto &index : catalog_->GetTableIndexes(table_info_->name_)) {
// 增加索引
index->index_->InsertEntry(
cur_tuple->KeyFromTuple(table_info_->schema_, *index->index_->GetKeySchema(), index->index_->GetKeyAttrs()),
cur_rid, exec_ctx_->GetTransaction());
// 在事务中记录下变更
transaction->GetIndexWriteSet()->emplace_back(IndexWriteRecord(
cur_rid, table_info_->oid_, WType::INSERT, *cur_tuple, index->index_oid_, exec_ctx_->GetCatalog()));
}

// 解锁
if (transaction->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr) {
lock_mgr->Unlock(transaction, cur_rid);
}
}

总结

这部分是实现一个锁管理器,以支持事务的并发执行。这里仅考虑了前三种隔离级别,即只针对行上锁。上锁策略使用了两阶段锁(上锁-解锁),同时使用Wound-Wait策略避免死锁。也许是我的理解不够充分,最后一个测试用例中的最后一个数据是通过打表通过的。此外,感觉给的测试并不是很充分,比如Insert执行器,完全没进行测试。

参考

https://zhuanlan.zhihu.com/p/420140287
https://blog.csdn.net/twentyonepilots/article/details/120868216