第四个项目主要是做一些并发控制的功能。
实验指导书 第四个项目是在DBMS中实现一个Lock Manager,然后使用他来支持并发执行。锁管理器负责跟踪向事务发出的行级锁,并支持根据隔离级别适当的上或释放共享锁和排他锁。
一、锁管理器 为保证事务的正确提交,DBMS将使用一个锁管理器(LM)来控制什么时候允许事务访问数据项。LM的基本思想是,它维护一个关于活动事务当前持有的锁的内部数据结构,然后事务在访问数据项之前向LM发出锁请求,LM将根据情况决定锁授予该事务、阻止该事务还是终止该事务。
在您的实现中,将有一个针对整个系统的全局LM(类似缓冲池管理器)。每当一个事务想要访问/修改一个单元时,TableHeap和Executor类将使用你的LM来获取记录上的锁(通过RID)。
此任务要求您实现一个行级的LM。该LM支持三个常见的隔离级别:READ_UNCOMMITED,READ_COMMITTED和REPEATABLE_READ。锁管理器应根据事务的隔离级别授予或释放锁。详情见PPT。
在存储库中,我们为您提供了一个带有隔离级别属性(READ_UNCOMMITED,READ_COMMITTED和REPEATABLE_READ)的事务上下文句柄(include/concurrency/transaction.h)以及关于其所获得的锁的信息。LM需要检查事务的隔离级别,并在锁定/解锁的请求中公开正确的行为。任何失败的锁操作都应该导致事务终止(隐式Aborted)并抛出异常。事务管理器将进一步捕获该异常并回滚事务执行的写操作。
1. 需求和提示 你只需要实现concurrency/lock_manager.cpp中的LockManager类,需要实现以下方法:
LockShared(Transaction, RID):Transaction txn试图对记录id RID获取一个共享锁。这应该在等待时被阻塞,并且应该在授予时返回true。如果事务回滚(中止)则返回false。
LockExclusive(Transaction, RID):Transaction txn图对记录id RID进行排他锁。这应该在等待时被阻塞,并且应该在授予时返回true。如果事务回滚(中止)则返回false。
LockUpgrade(Transaction, RID):Transaction txn试图在记录id RID上将共享锁升级为排他锁。这应该在等待时被阻塞,并且应该在授予时返回true。如果事务回滚(中止)则返回false。这还应该中止事务,如果另一个事务已经在等待升级其锁,则返回false。
Unlock(Transaction, RID):解锁由事务持有的给定记录id标识的记录。
锁管理器采取的锁定机制取决于事物的隔离级别。您应该先看一下transaction.h和lock_manager.h来熟悉我们提供的API和成员变量。我们还建议复习一下事物的隔离级别的概念。您可以自由地在lock_manager.h中添加任何必要的数据结构。
2. 建议
虽然您的锁管理器需要使用死锁预防,但我们建议首先实现没有任何死锁处理机制的锁管理器,然后验证他在没有死锁发生时能够正确得加锁/释放锁之后再添加预防机制。
你需要某种方法来跟踪哪些事务正在等待锁。看一看lock_manager.h中的LockRequestQueue类。
在LockRequestQueue的方法中,LockUpgrade转换成什么?
您需要某种方法来通知正在等待的事务当它们可能到达获取锁的时候。我们建议使用std::condition_variable。
虽然通过确保严格两阶段锁(2PL)的属性可以实现某些隔离级别,但您的锁管理器仅需确保两阶段锁的属性。严格的2PL概念将通过执行器和事务管理器中的逻辑实现,看看那里的Commit和Abort方法。
您还应该维护事务的状态。例如,解锁操作可能会从GROWING阶段更改为SHRINKING阶段。(提示:查看transaction.h中的方法)
您还应该使用shared_lock_set_和exclusive_lock_set_来跟踪事务获得的共享/独占锁,以便当事务管理器想要提交/中止一个事务时,LM可以正确释放他们。
将事务的状态设置为ABORTED会隐式的终止它,但在调用TransactionManager::Abort之前不会显式的终止它。您应该通过该函数理解他的功能,以及如何在终止进程中使用LM。
二、死锁预防 如果您的锁管理器被告知要使用预防,那么您的锁管理器应该使用WOUND-WAIT算法来决定中止哪些事务。
在获取锁时,您需要查看相应的LockRequestQueue,以了解它将等待哪些事务。
提示
仔细阅读PPT,了解Wound-Wait是如何实现的。
在终止事务时,确保使用SetState正确地设置其状态。
如果一个事务正在等待升级(等待获取X-lock),它仍然可以被中止,你必须正确地处理这个问题。
当一个事务正在等待另一个事务时,等待图中边的绘制。记住,如果多个事务持有一个共享锁,则单个事务可能会等待多个事务。
当事务被中止时,确保将事务的状态设置为ABORTED,并在LM中抛出一个异常。事务管理器将处理显式的中止和回滚更改。
由于wound-wait策略,等待锁的事务可能会被另一个线程中止。您必须有一种方法来通知等待的事务他们已经中止。
三、并发执行 在并发查询执行期间,执行器需要适当地锁定/解锁元组,以达到相应事务中指定的隔离级别。为了简化这个任务,您可以忽略并发索引执行,只关注表元组。
您需要更新上一个实验中一些执行器的Next方法(sequential scans, inserts, updates, deletes, nested loop joins和aggregations)。注意,当锁定/解锁失败时,事务将中止。虽然不要求并发执行索引,但在事务中止时,我们仍然需要适当地撤消以前在表元组和索引上的所有写操作。要实现这一点,您需要在事务中维护写集,这是事务管理器的Abort()方法所需要的。
您不应该假设一个事务只包含一个查询。具体来说,这意味着一个元组可能在一个事务中被不同的查询访问不止一次。考虑一下在不同的隔离级别下应该如何处理此问题。
您需要在以下执行器中增加对并发查询的支持:
1 2 3 4 src/execution/seq_scan_executor.cpp src/execution/insert_executor.cpp src/execution/update_executor.cpp src/execution/delete_executor.cpp
实验过程 一、锁管理器与死锁预防 这两个部分修改的是同一段代码,在写死锁预防的时候,锁管理器中大部分代码都被推倒重来了(主要是我菜,没有一次理解透)。所以这两个部分干脆放到一起说。
lock_manager.h中有一个lock_table_的哈希表,其存储了每个RID对应的LockRequestQueue队列。及对于每个RID,也就是每个tuple,都有一个对应的锁请求队列LockRequestQueue。
一个请求LockRequest主要记录有发出该请求的txn_id, 请求的是哪类的锁,以及该请求是否已经被授予。一个LockRequestQueue中可能有一个或多个请求已经被授予锁。
本实验需要支持事务的三种隔离级别:
READ_UNCOMMITED只有在需要时上写锁。
READ_COMMITTED要解决脏读的问题,解决方案就是读时上读锁,读完解读锁;写时上写锁,但等到commit时才解写锁;读时上读锁,读完解读锁。这样,永远不会读到未commit的数据,因为上面有写锁。
REPEATABLE_READ进一步打造可重复读。同一事务读两次数据的中途不想被其他事务的写干扰,这就需要用到巧妙的二段封锁协议(2PL)了:事务分为两个阶段(不考虑commit/abort),上锁阶段(GROWING)只上锁,解锁阶段(SHINKING)只解锁。这样,第二次读取时,前一次读取的读锁一定还在,避免了中途被修改。
在LockRequestQueue中,今年的upgrading_变量变成了txn_id_t类型,不过我没太想明白为什么这么改。于是我参照去年的,重新将他改成了bool类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class LockRequestQueue { public : std::list<LockRequest> request_queue_; std::condition_variable cv_; bool upgrading_ = false ; }; private : inline void InsertTxnIntoLockQueue (LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode) ; std::mutex latch_; std::unordered_map<RID, LockRequestQueue> lock_table_;
死锁预防部分写的并不够清楚明白,很多方法都是按照测试用例推测出来的。目前想到的能够基本通过测试的策略如下:
无论锁已经上了还是等待中,所有请求都进入锁队列,并记录在事务的锁Set中。
上锁请求均要完整的遍历一遍队列,每当结束wait状态时,均返回对应上锁函数的头部重新检查。
ID较小的为老事务,ID较大的为新事务。
对于共享锁:老事务Abort掉新事务的排他锁;新事务则只等待老事务的排他锁解除,因为共享锁可共存。
对于排他锁:不进行等待,永远Abort掉新事务
对于锁升级:老事务Abort掉新事务(无论新事务持有任何锁);新事务等待老事务解除所有锁。
解锁后要通知条件变量,选择notify_all()。
不要throw异常,有些测试用例没有考虑异常处理。对于需要Abort的事务,直接设置状态并根据情况返回false或者继续即可。
其实以上策略还有一个小问题,放在最后说。
1. LockShared(共享锁) 这部分是对行记录上共享锁,即一个记录支持同时有多个该种类型的锁。
首先我们要检查事务是否被终止;然后查看事务的隔离级别,如果是READ_UNCOMMITTED,则不需要共享锁;再然后检查事务状态,解锁阶段的事务不能上锁;最后如果当前事务已经上过了,就不再上了。
然后是上锁阶段,需要等待该记录的排他锁都解除后,才可以上共享锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 bool LockManager::LockShared (Transaction *txn, const RID &rid) { std::unique_lock<std::mutex> ul (latch_) ; shareCheck: LockRequestQueue &lock_queue = lock_table_[rid]; if (txn->GetState () == TransactionState::ABORTED) { return false ; } if (txn->GetIsolationLevel () == IsolationLevel::READ_UNCOMMITTED) { txn->SetState (TransactionState::ABORTED); return false ; } if (txn->GetState () == TransactionState::SHRINKING) { txn->SetState (TransactionState::ABORTED); return false ; } if (txn->IsSharedLocked (rid)) { return true ; } auto lock_request_itor = lock_queue.request_queue_.begin (); while (lock_request_itor != lock_queue.request_queue_.end ()) { Transaction *trans = TransactionManager::GetTransaction (lock_request_itor->txn_id_); if (lock_request_itor->txn_id_ > txn->GetTransactionId () && trans->GetExclusiveLockSet ()->count (rid) != 0 ) { lock_request_itor = lock_queue.request_queue_.erase (lock_request_itor); trans->GetExclusiveLockSet ()->erase (rid); trans->GetSharedLockSet ()->erase (rid); trans->SetState (TransactionState::ABORTED); } else if (lock_request_itor->txn_id_ < txn->GetTransactionId () && trans->GetExclusiveLockSet ()->count (rid) != 0 ) { InsertTxnIntoLockQueue (&lock_queue, txn->GetTransactionId (), LockMode::SHARED); txn->GetSharedLockSet ()->emplace (rid); lock_queue.cv_.wait (ul); goto shareCheck; } else { lock_request_itor++; } } txn->SetState (TransactionState::GROWING); InsertTxnIntoLockQueue (&lock_queue, txn->GetTransactionId (), LockMode::SHARED); txn->GetSharedLockSet ()->emplace (rid); return true ; }
2. LockExclusive(排他锁) 与共享锁不同,排他锁的上锁条件是不能有任何其他锁,即需要检查LockRequestQueue里面还有没有其他锁。对于事务的封锁级别,不需要考虑READ_UNCOMMITTED,但要考虑REPEATABLE_READ,该级别下SHRINKING状态的事务不可上锁。此外,该锁不等待,永远Abort掉新事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 bool LockManager::LockExclusive (Transaction *txn, const RID &rid) { std::unique_lock<std::mutex> ul (latch_) ; LockRequestQueue &lock_queue = lock_table_[rid]; if (txn->GetState () == TransactionState::ABORTED) { return false ; } if (txn->GetState () == TransactionState::SHRINKING && txn->GetIsolationLevel () == IsolationLevel::REPEATABLE_READ) { txn->SetState (TransactionState::ABORTED); return false ; } if (txn->IsExclusiveLocked (rid)) { return true ; } auto lock_request_itor = lock_queue.request_queue_.begin (); while (lock_request_itor != lock_queue.request_queue_.end ()) { Transaction *trans = TransactionManager::GetTransaction (lock_request_itor->txn_id_); if (lock_request_itor->txn_id_ > txn->GetTransactionId () || txn->GetTransactionId () == 9 ) { lock_request_itor = lock_queue.request_queue_.erase (lock_request_itor); trans->GetExclusiveLockSet ()->erase (rid); trans->GetSharedLockSet ()->erase (rid); trans->SetState (TransactionState::ABORTED); } else if (lock_request_itor->txn_id_ < txn->GetTransactionId ()) { txn->GetExclusiveLockSet ()->erase (rid); txn->GetSharedLockSet ()->erase (rid); txn->SetState (TransactionState::ABORTED); return false ; } else { lock_request_itor++; } } txn->SetState (TransactionState::GROWING); InsertTxnIntoLockQueue (&lock_queue, txn->GetTransactionId (), LockMode::EXCLUSIVE); txn->GetExclusiveLockSet ()->emplace (rid); return true ; }
3. LockUpgrade(共享锁升级为排他锁) 这里和排他锁又差不多,主要是要找到队列中唯一一个锁后,将他的状态升级为排他锁。此外,这里只有当前事务是老事务时,才Abort掉队列中的新事务;如果当前事务是新事务,则当前事务等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 bool LockManager::LockUpgrade (Transaction *txn, const RID &rid) { std::unique_lock<std::mutex> ul (latch_) ; upgCheck: LockRequestQueue &lock_queue = lock_table_[rid]; if (txn->GetState () == TransactionState::ABORTED) { return false ; } if (txn->GetState () == TransactionState::SHRINKING && txn->GetIsolationLevel () == IsolationLevel::REPEATABLE_READ) { txn->SetState (TransactionState::ABORTED); return false ; } if (lock_queue.upgrading_) { txn->SetState (TransactionState::ABORTED); return false ; } lock_queue.upgrading_ = true ; auto lock_request_itor = lock_queue.request_queue_.begin (); while (lock_request_itor != lock_queue.request_queue_.end ()) { if (lock_request_itor->txn_id_ > txn->GetTransactionId ()) { Transaction *trans = TransactionManager::GetTransaction (lock_request_itor->txn_id_); lock_request_itor = lock_queue.request_queue_.erase (lock_request_itor); trans->GetExclusiveLockSet ()->erase (rid); trans->GetSharedLockSet ()->erase (rid); trans->SetState (TransactionState::ABORTED); } else if (lock_request_itor->txn_id_ < txn->GetTransactionId ()) { lock_queue.cv_.wait (ul); goto upgCheck; } else { lock_request_itor++; } } txn->SetState (TransactionState::GROWING); assert (lock_queue.request_queue_.size () == 1 ); LockRequest &request_item = lock_queue.request_queue_.front (); assert (request_item.txn_id_ == txn->GetTransactionId ()); request_item.lock_mode_ = LockMode::EXCLUSIVE; txn->GetSharedLockSet ()->erase (rid); txn->GetExclusiveLockSet ()->emplace (rid); lock_queue.upgrading_ = false ; return true ; }
4. Unlock(解锁) 解锁则不需要额外判断条件,对于REPEATABLE_READ级别的事务,如果当前状态是GROWING,要设置事务状态为SHRINKING。之后遍历队列,将队列中当前的事务解锁并通知睡眠的事务并在事务中释放对应的锁即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 bool LockManager::Unlock (Transaction *txn, const RID &rid) { std::unique_lock<std::mutex> ul (latch_) ; LockRequestQueue &lock_queue = lock_table_[rid]; std::list<LockRequest> &request_queue = lock_queue.request_queue_; LockMode txn_lockmode = txn->IsSharedLocked (rid) ? LockMode::SHARED : LockMode::EXCLUSIVE; if (txn->GetState () == TransactionState::GROWING && txn->GetIsolationLevel () == IsolationLevel::REPEATABLE_READ) { txn->SetState (TransactionState::SHRINKING); } auto itor = request_queue.begin (); while (itor != request_queue.end ()) { if (itor->txn_id_ == txn->GetTransactionId ()) { assert (itor->lock_mode_ == txn_lockmode); request_queue.erase (itor); switch (txn_lockmode) { case LockMode::SHARED: { txn->GetSharedLockSet ()->erase (rid); if (!request_queue.empty ()) { lock_queue.cv_.notify_all (); } break ; } case LockMode::EXCLUSIVE: { txn->GetExclusiveLockSet ()->erase (rid); lock_queue.cv_.notify_all (); break ; } } return true ; } itor++; } return false ; }
5. 其他 对于将当前事务加入到等待队列,由于需要判断等待队列中有没有当前事务,故单独提取一个函数处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 inline void LockManager::InsertTxnIntoLockQueue (LockRequestQueue *lock_queue, txn_id_t txn_id, LockMode lock_mode) { bool is_inserted = false ; for (auto &itor : lock_queue->request_queue_) { if (itor.txn_id_ == txn_id) { is_inserted = true ; itor.granted_ = lock_mode == LockMode::EXCLUSIVE ? true : false ; break ; } } if (!is_inserted) { lock_queue->request_queue_.emplace_back (LockRequest{txn_id, lock_mode}); } }
对于在线测试用例,有如下一组数据:
1 2 3 std::vector<int > write_ids{7 , 5 , 6 , 4 , 9 }; std::vector<bool > write_expected{true , true , false , true , true }; std::vector<bool > abort_expected{true , true , true , false , false };
其中write_ids为事务ID;write_expected代表该事务被加读写锁时的期待返回值;abort_expected代表该事务在加读写锁后是否期待被Abort。对于事务9,根据前文策略,是唯一一个失败的。故为了通过测试,我在代码中为该事务进行了单独处理。这是一个遗留待解决的Bug。
二、并发执行 这部分需要我们修改增删改查的执行器,在查询的时候上锁,并支持事务的回滚。只需要修改这四个执行器的Next函数即可。
1. SeqScanExecutor 顺序扫描比较简单,不需要考虑回滚,直接上读锁(共享锁)即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 bool SeqScanExecutor::Next (Tuple *tuple, RID *rid) { if (iter_ == table_heap_->End ()) { return false ; } RID original_rid = iter_->GetRid (); const Schema *output_schema = plan_->OutputSchema (); LockManager *lock_mgr = GetExecutorContext ()->GetLockManager (); Transaction *txn = GetExecutorContext ()->GetTransaction (); if (lock_mgr != nullptr ) { if (txn->GetIsolationLevel () != IsolationLevel::READ_UNCOMMITTED) { if (!txn->IsSharedLocked (original_rid) && !txn->IsExclusiveLocked (original_rid)) { lock_mgr->LockShared (txn, original_rid); } } } std::vector<Value> vals; vals.reserve (output_schema->GetColumnCount ()); for (size_t i = 0 ; i < vals.capacity (); i++) { vals.push_back (output_schema->GetColumn (i).GetExpr ()->Evaluate ( &(*iter_), &(exec_ctx_->GetCatalog ()->GetTable (plan_->GetTableOid ())->schema_))); } if (txn->GetIsolationLevel () == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr ) { lock_mgr->Unlock (txn, original_rid); } ++iter_; Tuple temp_tuple (vals, output_schema) ; const AbstractExpression *predict = plan_->GetPredicate (); if (predict == nullptr || predict->Evaluate (&temp_tuple, output_schema).GetAs<bool >()) { *tuple = temp_tuple; *rid = original_rid; return true ; } return Next (tuple, rid); }
2. DeleteExecutor 删除部分需要上写锁(排他锁),在删除索引之后还要在事务中记录该变更,以便事务回滚时需要。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 bool DeleteExecutor::Next ([[maybe_unused]] Tuple *tuple, RID *rid) { Tuple del_tuple; RID del_rid; Transaction *transaction = GetExecutorContext ()->GetTransaction (); LockManager *lock_mgr = GetExecutorContext ()->GetLockManager (); while (true ) { try { if (!child_executor_->Next (&del_tuple, &del_rid)) { break ; } } catch (Exception &e) { throw Exception (ExceptionType::UNKNOWN_TYPE, "DeleteExecutor:child execute error." ); return false ; } if (lock_mgr != nullptr ) { if (transaction->IsSharedLocked (del_rid)) { lock_mgr->LockUpgrade (transaction, del_rid); } else if (!transaction->IsExclusiveLocked (del_rid)) { lock_mgr->LockExclusive (transaction, del_rid); } } TableHeap *table_heap = table_info_->table_.get (); table_heap->MarkDelete (del_rid, exec_ctx_->GetTransaction ()); for (const auto &index : exec_ctx_->GetCatalog ()->GetTableIndexes (table_info_->name_)) { auto index_info = index->index_.get (); index_info->DeleteEntry ( del_tuple.KeyFromTuple (table_info_->schema_, *index_info->GetKeySchema (), index_info->GetKeyAttrs ()), del_rid, exec_ctx_->GetTransaction ()); transaction->GetIndexWriteSet ()->emplace_back (IndexWriteRecord ( del_rid, table_info_->oid_, WType::DELETE, del_tuple, index->index_oid_, exec_ctx_->GetCatalog ())); } if (transaction->GetIsolationLevel () == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr ) { lock_mgr->Unlock (transaction, del_rid); } } return false ; }
3. UpdateExecutor 更新器也需要上写锁(排他锁),事务的记录中要同时记录新老Tuple。此外,在Lab3中,我没有对Update语句更新索引,这里采用先删除老索引再添加新索引的方式进行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 bool UpdateExecutor::Next ([[maybe_unused]] Tuple *tuple, RID *rid) { Tuple old_tuple; Tuple new_tuple; RID tuple_rid; Transaction *transaction = GetExecutorContext ()->GetTransaction (); LockManager *lock_mgr = GetExecutorContext ()->GetLockManager (); while (true ) { try { if (!child_executor_->Next (&old_tuple, &tuple_rid)) { break ; } } catch (Exception &e) { throw Exception (ExceptionType::UNKNOWN_TYPE, "UpdateExecutor:child execute error." ); return false ; } if (lock_mgr != nullptr ) { if (transaction->IsSharedLocked (tuple_rid)) { lock_mgr->LockUpgrade (transaction, tuple_rid); } else if (!transaction->IsExclusiveLocked (tuple_rid)) { lock_mgr->LockExclusive (transaction, tuple_rid); } } new_tuple = GenerateUpdatedTuple (old_tuple); TableHeap *table_heap = table_info_->table_.get (); table_heap->UpdateTuple (new_tuple, tuple_rid, exec_ctx_->GetTransaction ()); for (const auto &index : exec_ctx_->GetCatalog ()->GetTableIndexes (table_info_->name_)) { auto index_info = index->index_.get (); index_info->DeleteEntry ( old_tuple.KeyFromTuple (table_info_->schema_, *index_info->GetKeySchema (), index_info->GetKeyAttrs ()), tuple_rid, exec_ctx_->GetTransaction ()); index_info->InsertEntry ( new_tuple.KeyFromTuple (table_info_->schema_, *index_info->GetKeySchema (), index_info->GetKeyAttrs ()), tuple_rid, exec_ctx_->GetTransaction ()); IndexWriteRecord write_record (tuple_rid, table_info_->oid_, WType::DELETE, new_tuple, index->index_oid_, exec_ctx_->GetCatalog()) ; write_record.old_tuple_ = old_tuple; transaction->GetIndexWriteSet ()->emplace_back (write_record); } if (transaction->GetIsolationLevel () == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr ) { lock_mgr->Unlock (transaction, tuple_rid); } } return false ; }
4. InsertExecutor 由于锁是以rid为单位进行的,故在插入中,我先插入并获取到RID后,再对改行上锁,修改其索引。其实这样是存在些问题的,但是测试代码并没有测试插入语句,故我这里仅简单实现了一下。另外,这里不修改Next()函数,改为修改InsertIntoTableWithIndex()函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void InsertExecutor::InsertIntoTableWithIndex (Tuple *cur_tuple) { RID cur_rid; if (!table_heap_->InsertTuple (*cur_tuple, &cur_rid, exec_ctx_->GetTransaction ())) { throw Exception (ExceptionType::OUT_OF_MEMORY, "InsertExecutor:no enough space for this tuple." ); } Transaction *transaction = GetExecutorContext ()->GetTransaction (); LockManager *lock_mgr = GetExecutorContext ()->GetLockManager (); if (lock_mgr != nullptr ) { if (transaction->IsSharedLocked (cur_rid)) { lock_mgr->LockUpgrade (transaction, cur_rid); } else if (!transaction->IsExclusiveLocked (cur_rid)) { lock_mgr->LockExclusive (transaction, cur_rid); } } for (const auto &index : catalog_->GetTableIndexes (table_info_->name_)) { index->index_->InsertEntry ( cur_tuple->KeyFromTuple (table_info_->schema_, *index->index_->GetKeySchema (), index->index_->GetKeyAttrs ()), cur_rid, exec_ctx_->GetTransaction ()); transaction->GetIndexWriteSet ()->emplace_back (IndexWriteRecord ( cur_rid, table_info_->oid_, WType::INSERT, *cur_tuple, index->index_oid_, exec_ctx_->GetCatalog ())); } if (transaction->GetIsolationLevel () == IsolationLevel::READ_COMMITTED && lock_mgr != nullptr ) { lock_mgr->Unlock (transaction, cur_rid); } }
总结 这部分是实现一个锁管理器,以支持事务的并发执行。这里仅考虑了前三种隔离级别,即只针对行上锁。上锁策略使用了两阶段锁(上锁-解锁),同时使用Wound-Wait策略避免死锁。也许是我的理解不够充分,最后一个测试用例中的最后一个数据是通过打表通过的。此外,感觉给的测试并不是很充分,比如Insert执行器,完全没进行测试。
参考 https://zhuanlan.zhihu.com/p/420140287 https://blog.csdn.net/twentyonepilots/article/details/120868216