0%

CMU-15445-BusTub 笔记 - Lab 2: Hash Index

第二个项目要求实现一个可扩展的哈希表。

实验指导书

第二个项目是为ButsTub DBMS实现一个磁盘支持的哈希表。这个哈希表负责快速搜索,而不必搜索数据库表中的每一条记录。

这里使用可扩展的哈希模式来实现一个哈希表。

索引包含了一个directory page,该目录页包含指向bucket page的指针。该表将通过第一个实现中的缓冲池访问页面。该表包含的目录页存储了table和bucket的所有元数据。这个哈希表需要支持对满/空bucket的拆分和合并,以及在全局深度必须更改时支持directory的扩展和收缩。

一、页布局

该哈希表应该通过DBMS的BufferPoolManager来访问,这意味着你不能分配内存来存储信息。所有内容都必须存储在磁盘页中,以便能够从DiskManager中读取或者写入他们。如果创建一个哈希表,并将其页写入磁盘,然后重启DBMS,则应该能够在重启之后从磁盘加载回哈希表。

为了支持在页面顶部读写哈希表bucket,你将要实现两个Page类来存储哈希表的数据。这意味着要教你如何从BufferPoolManager中以页面的形式分配内存。

1. Hash Table Directory Page

该类保存哈希表的所有元数据。它被划分为如下表所示的字段:

变量名 大小 描述
page_id_ 4 bytes 自身的pageid
lsn_ 4 bytes 日志流水号(实验4将用到)
global_depth_ 4 bytes 目录的全局深度
local_depths_ 512 bytes 每个bucket的局部深度的数组(Uint_8)
bucket_page_ids_ 2048 bytes bucket的pageid的数组

bucket_page_ids_数组将桶id映射到这里,bucket_page_ids_中的第i个元素是第i个桶的page_id。

2. Hash Table Bucket Page

他保存了三个变量:
occupied_ : 如果array_的第i个索引曾经被占用过,则occued_的第i位为1。
readable_ : 如果array_的第i个索引包含一个可读值,则readable_的第i位为1。
array_ : 保存key-value对的数组。

Bucket Page中可用的槽数取决于存储的Key和Value的类型,您只需要支持定长的Key和Value即可。在单个实例中,他们的大小都是不变的,但是不同的实例中则有可能改边。(例如,哈希表#1可以有32位键,而哈希表#2可以有64位键)。

每个哈希表的Directory或Bucket页对应于从缓冲池中获取的内存内的内容(即data_)。每次尝试读写页面的时候,需要使用唯一的pageid从缓冲池中获取页面,然后将他们重新转换为Directory或Bucket页,并且要在任何读写操作之后Unpin这个页面。

您只需要完成以下函数即可:
Bucket Page: - Insert - Remove - IsOccupied - IsReadable - KeyAt - ValueAt
Directory Page: - GetGlobalDepth - IncrGlobalDepth - SetLocalDepth - SetBucketPageId - GetBucketPageId

二、实现哈希表

您将实现一个可扩展的哈希表,它需要支持插入(Insert),点搜索(GetValue)和删除(Remove)。在头文件和代码中有很多Helper functions的实现和说明。您唯一严格的API要求是遵守Insert、GetValue和Remove,此外,还不能修改VerifyIntegrity函数。

您的哈希表必须同时支持唯一键和非唯一键。同一键不允许有重复的值。这意味着(key_0, value_0)和(key_0, value_1)可以存在于同一个哈希表中,但(key_0, value_0)和(key_0, value_0)不能存在于同一个哈希表中。如果Insert方法试图插入现有的键值对,则只返回false。

所有函数都是以模板形式给出的,Key和Value的类型由模板定义,Key的比较类型由传入的cmp函数定义,Value则可以使用简单的==来比较。

1. 可扩展哈希表的细节

这个实现要求您实现Bucket的拆分/合并以及目录的增长/收缩。一些可扩展哈希的实现跳过了Bucket的合并,因为它可能在某些场景中导致抖动。不过我们这里实现它是为了让你对数据结构有更深的理解,并提供更多的机会来学习如何管理latches,locks以及页面操作等。

1.1 Directory Indexing

当插入哈希索引时,您将希望使用最低有效位来索引目录。当然,正确使用最高有效位是可能的,但是使用最低有效位会使目录展开操作简单得多。

1.2 Splitting Buckets

如果没有插入的空间,就必须将Bucket一分为二。如果你觉得这样更容易的话,你也可以在桶刚满的时候执行split。但是,参考的解决方案是只有当插入会造成页面溢出的时候才进行拆分。因此,您可能会发现所提供的API更适合这种方法。

1.3 Merging Buckets

当Bucket变空之后,必须要尝试合并他们。通过检查Bucket的占用及其split image,可以实现更积极地合并。但这些昂贵的检查和额外的合并会增加抖动。
为了保持相对简单,请使用以下规则进行合并:

  1. 只合并空bucket
  2. 只有在split image具有相同的local depth时,才将bucket和他们的split image。

如果您对split image感觉到困惑,请查阅算法和代码文档。

1.4 Directory Growing

这部分没什么规则,你只能扩展深度或者不扩展。

1.5 Directory Shrinking

只有当每个bucket的local depth严格小于global depth时,才执行目录收缩。您可能看到其他关于目录收缩的测试,但这个测试并不重要,因为我们将local depth保存在目录页中。

2. 性能

一个重要的有关性能的细节是只在需要时使用write lock和latches。经常使用write locks将会导致gradscope超时。

此外,一个潜在的优化是在bucket页面上考虑您自己的扫描方式,这可以在某些情况下避免重复扫描。您将发现检查bucket页面的许多内容通常需要进行全面扫描,因此您可以一次性收集所有这些信息。

三、并发控制

到目前为止,您可以假设您的哈希表只支持单线程执行。在最后一个任务中,您将修改您的实现,使其支持多个线程同时读/写表。

您需要在每个bucket中使用latches,以便当一个线程写入bucket时,其他线程不会同时读取或修改该索引。您还应该允许多个读取器同时读取同一个bucket。

当您需要分割或合并桶时,以及全局深度发生变化时,您将需要latch整个哈希表。

在这个项目中有两个锁需要关注。第一个是extendible_hash_table.h中的table_latch_,它接受哈希表上的锁。它来自src/include/common/ RWLatch .h中的RWLatch类。正如你在代码中看到的,它是由std::mutex实现的。第二个是src/include/storage/page.h中的内置页面锁定功能。这是您必须使用的,以保护您的桶页。注意,要获取table_latch_上的读锁,需要从RWLatch.h中调用RLock,但是要获取bucket页上的读锁,必须将< page *>重新解释为一个页指针,并从page.h中调用RLatch方法。

实验过程

关于可扩展的哈希,可以看看如下两个文章:
https://www.geeksforgeeks.org/extendible-hashing-dynamic-approach-to-dbms/
http://www.mathcs.emory.edu/~cheung/Courses/554/Syllabus/3-index/extensible-hashing.html

我的理解就是结合传统的哈希表与树,去实现一个可扩展/收缩、每一层次使用哈希检索的树。为什么不直接用哈希?感觉这里是考虑了内存的容量,通过索引的方式去读硬盘,避免大量占用内存。

一、页布局

这部分主要是为下一个任务做一些准备。可扩展哈希表包含Directory和Bucket,故要先为这两个类实现一些操作,供下一个任务调用。

1. Directory Page

先关注hash_table_directory_page.cpp文件,这里主要是对页目录的一些操作。
先挑几个重点写

1.1 IncrGlobalDepth

IncrGlobalDepth函数和给好的Decr不太一样,这里在扩展完之后需要再复制一份数据。(每扩展一次,数组中使用的部分x2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// storage/page/hash_table_directory_page.cpp
/**
* Increment the global depth of the directory
*/
void HashTableDirectoryPage::IncrGlobalDepth() {
assert(global_depth_ < MAX_BUCKET_DEPTH);
// 这里主要是需要将bucket_page_ids_和local_depths_的数据在现有数组的末尾再复制一份
// 扩容时没有进行数据迁移,两个bucket对应一个page
int org_num = Size();
for (int org_index = 0, new_index = org_num; org_index < org_num; new_index++, org_index++) {
bucket_page_ids_[new_index] = bucket_page_ids_[org_index];
local_depths_[new_index] = local_depths_[org_index];
}
global_depth_++;
}
1.2 CanShrink

CanShrink函数用于判断目录能否收缩,取决于是否每个localdepth都比globaldepth小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// storage/page/hash_table_directory_page.cpp
/**
* @return true if the directory can be shrunk
*/
bool HashTableDirectoryPage::CanShrink() {
// 整个Directory能不能收缩取决于每个localdepth是否都比globaldepth小
// 循环判断即可
for (uint32_t i = 0; i < Size(); i++) {
if (local_depths_[i] >= global_depth_) {
return false;
}
}
return true;
}
1.3 Other

其余函数跟注释来就行,基本都是Get和Set或者简单的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// storage/page/hash_table_directory_page.cpp
uint32_t HashTableDirectoryPage::GetGlobalDepthMask() {
// 这里根据注释中的示例写就行
// Example: 当global_depth_是3的时候
// 0000...000001 << global_depeth_ = 0000...01000
// 再减1即可
return ((1 << global_depth_) - 1);
}

page_id_t HashTableDirectoryPage::GetBucketPageId(uint32_t bucket_idx) { return bucket_page_ids_[bucket_idx]; }

void HashTableDirectoryPage::SetBucketPageId(uint32_t bucket_idx, page_id_t bucket_page_id) {
bucket_page_ids_[bucket_idx] = bucket_page_id;
}

uint32_t HashTableDirectoryPage::Size() {
// 2 ^ global_depth_
return (1 << global_depth_);
}

uint32_t HashTableDirectoryPage::GetLocalDepth(uint32_t bucket_idx) { return local_depths_[bucket_idx]; }

void HashTableDirectoryPage::SetLocalDepth(uint32_t bucket_idx, uint8_t local_depth) {
assert(local_depth <= global_depth_);
local_depths_[bucket_idx] = local_depth;
}

void HashTableDirectoryPage::IncrLocalDepth(uint32_t bucket_idx) {
assert(local_depths_[bucket_idx] < global_depth_);
local_depths_[bucket_idx]++;
}

void HashTableDirectoryPage::DecrLocalDepth(uint32_t bucket_idx) { local_depths_[bucket_idx]--; }

uint32_t HashTableDirectoryPage::GetSplitImageIndex(uint32_t bucket_idx) {
// Example: 当对应的localdepth是3时:
// 1<<(3-1) = 0000....00100
// 具体用途后面再说
return bucket_idx ^ (1 << (local_depths_[bucket_idx] - 1));
}

2. Bucket page

然后关注hash_table_bucket_page.cpp文件,这里是关于bucket的操作。

2.1 Occupied & Readable

先实现一些底层操作,包含occupied和readable的判定与设置。
核心都是以位为单位,在char类型中取得对应的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// storage/page/hash_table_bucket_page.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsOccupied(uint32_t bucket_idx) const {
// 使用位运算,判断对应位是否为1
uint8_t c = static_cast<uint8_t>(occupied_[bucket_idx / 8]);
return (c & (1 << (bucket_idx % 8))) > 0;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_BUCKET_TYPE::SetOccupied(uint32_t bucket_idx) {
// 将occupied对应位设置为1
uint8_t c = static_cast<uint8_t>(occupied_[bucket_idx / 8]);
c |= (1 << (bucket_idx % 8));
occupied_[bucket_idx / 8] = static_cast<char>(c);
}

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsReadable(uint32_t bucket_idx) const {
// 使用位运算,判断对应位是否为1
uint8_t c = static_cast<uint8_t>(readable_[bucket_idx / 8]);
return (c & (1 << (bucket_idx % 8))) > 0;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_BUCKET_TYPE::SetReadable(uint32_t bucket_idx) {
// 将readable对应位设置为1
uint8_t c = static_cast<uint8_t>(readable_[bucket_idx / 8]);
c |= (1 << (bucket_idx % 8));
readable_[bucket_idx / 8] = static_cast<char>(c);
}
2.2 IsFull & Numreadable & IsEmpty

然后做一些针对全体成员的位运算的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// storage/page/hash_table_bucket_page.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsFull() {
u_int8_t mask = 255;
// 先以char为单位
size_t i_num = BUCKET_ARRAY_SIZE / 8;
for (size_t i = 0; i < i_num; i++) {
uint8_t c = static_cast<uint8_t>(readable_[i]);
if ((c & mask) != mask) {
return false;
}
}

// 最后还要看剩余的
size_t i_remain = BUCKET_ARRAY_SIZE % 8;
if (i_remain > 0) {
uint8_t c = static_cast<uint8_t>(readable_[i_num]);
for (size_t j = 0; j < i_remain; j++) {
if ((c & 1) != 1) {
return false;
}
c >>= 1;
}
}
return true;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
uint32_t HASH_TABLE_BUCKET_TYPE::NumReadable() {
// 要分别对每个char中的每位做判断
uint32_t num = 0;

// 先以char为单位
size_t i_num = BUCKET_ARRAY_SIZE / 8;
for (size_t i = 0; i < i_num; i++) {
uint8_t c = static_cast<uint8_t>(readable_[i]);
for (uint8_t j = 0; j < 8; j++) {
// 取最低位判断
if ((c & 1) > 0) {
num++;
}
c >>= 1;
}
}

// 最后还要看剩余的
size_t i_remain = BUCKET_ARRAY_SIZE % 8;
if (i_remain > 0) {
uint8_t c = static_cast<uint8_t>(readable_[i_num]);
for (size_t j = 0; j < i_remain; j++) {
// 取最低位判断
if ((c & 1) == 1) {
num++;
}
c >>= 1;
}
}

return num;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsEmpty() {
// readable数组使用char存储,但这里不需要挨个判断,只需要有其中一个不符合条件就可以返回false
// 直接用sizeof运算获取长度
u_int8_t mask = 255;
for (size_t i = 0; i < sizeof(readable_) / sizeof(readable_[0]); i++) {
if ((readable_[i] & mask) > 0) {
return false;
}
}
return true;
}
2.3 GetValue & Insert & Remove & Other

最后是GetValue,Insert,Remove等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// storage/page/hash_table_bucket_page.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::GetValue(KeyType key, KeyComparator cmp, std::vector<ValueType> *result) {
// 将所有key符合的都插入到vector数组中
bool ret = false;
for (size_t i = 0; i < BUCKET_ARRAY_SIZE; i++) {
if (IsReadable(i) && cmp(key, array_[i].first) == 0) {
result->push_back(array_[i].second);
ret = true;
}
}
return ret;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::Insert(KeyType key, ValueType value, KeyComparator cmp) {
size_t available = -1;
// 遍历所有位置,找到一个可以插入的位置,并且确定有无完全相同的K/V,有则不插入
for (size_t i = 0; i < BUCKET_ARRAY_SIZE; i++) {
if (IsReadable(i)) {
if(cmp(key, array_[i].first) == 0&& value==array_[i].second]){
return false;
}
} else if (available = -1) {
available = i;
}
}

// 遍历完看看找没找到空位
if (available == -1) {
return false;
}

// 插入数据
array_[available] = MappingType(key, value);
SetOccupied(available);
SetReadable(available);
return true;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::Remove(KeyType key, ValueType value, KeyComparator cmp) {
// 直接找,找到了调用RemoveAt即可
for (size_t i = 0; i < BUCKET_ARRAY_SIZE; i++) {
if (IsReadable(i)) {
if (cmp(key, array_[i].first) == 0 && value == array_[i].second) {
RemoveAt(i);
return true;
}
}
}
return false;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
KeyType HASH_TABLE_BUCKET_TYPE::KeyAt(uint32_t bucket_idx) const {
return array_[bucket_idx].first;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
ValueType HASH_TABLE_BUCKET_TYPE::ValueAt(uint32_t bucket_idx) const {
return array_[bucket_idx].second;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_BUCKET_TYPE::RemoveAt(uint32_t bucket_idx) {
// 将其位置的readable设置为0即可
uint8_t c = static_cast<uint8_t>(readable_[bucket_idx / 8]);
c &= (~(1 << (bucket_idx % 8)));
readable_[bucket_idx / 8] = static_cast<char>(c);
}

二、实现哈希表

这里主要是可扩展哈希表的实现了,即extendible_hash_table.cpp。
这里先不考虑锁,下一节再考虑

Split image类似一个bucket的兄弟bucket,即他在装满之后分裂出来的那个bucket(也可以说两个bucket互为split image)。故在分裂合并的时候要寻找对应的split bucket,这就是前面GetSplitImageIndex函数的作用。

1. 构造函数

该类的构造函数主要是对directory_page_id_,即当前directory对应的页面ID初始化。由于还没有bucket,故要初始化为无效值。

1
2
3
4
5
6
7
template <typename KeyType, typename ValueType, typename KeyComparator>
HASH_TABLE_TYPE::ExtendibleHashTable(const std::string &name, BufferPoolManager *buffer_pool_manager,
const KeyComparator &comparator, HashFunction<KeyType> hash_fn)
: buffer_pool_manager_(buffer_pool_manager), comparator_(comparator), hash_fn_(std::move(hash_fn)) {
// implement me!
directory_page_id_ = INVALID_PAGE_ID;
}

2. Helper Functions

根据注释,我们首先实现一些helper函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// container/hash/extendible_hash_table.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
uint32_t HASH_TABLE_TYPE::KeyToDirectoryIndex(KeyType key, HashTableDirectoryPage *dir_page) {
// 参考注释,直接用掩码做与运算即可
return Hash(key) & dir_page->GetGlobalDepthMask();
}

template <typename KeyType, typename ValueType, typename KeyComparator>
page_id_t HASH_TABLE_TYPE::KeyToPageId(KeyType key, HashTableDirectoryPage *dir_page) {
// 调用现有函数即可
return dir_page->GetBucketPageId(KeyToDirectoryIndex(key, dir_page));
}

/**
* Fetches the directory page from the buffer pool manager.
* 从BufferPoolManager中得到directory所在的Page,Page中是一个Directory对象
* 如果还没有directory的话,那就创建一个
*
* @return a pointer to the directory page
*/
template <typename KeyType, typename ValueType, typename KeyComparator>
HashTableDirectoryPage *HASH_TABLE_TYPE::FetchDirectoryPage() {
HashTableDirectoryPage *ret;
// 如果不可用,则创建一个
if (directory_page_id_ == INVALID_PAGE_ID) {
// 先创建DirectoryPage
page_id_t new_page_id_dir;
Page *page = buffer_pool_manager_->NewPage(&new_page_id_dir);
assert(page != nullptr);
ret = reinterpret_cast<HashTableDirectoryPage *>(page->GetData());
directory_page_id_ = new_page_id_dir;
ret->SetPageId(directory_page_id_);
// 然后创建为空的directory创建第一个bucket
page_id_t new_page_id_buc;
page = nullptr;
page = buffer_pool_manager_->NewPage(&new_page_id_buc);
assert(page != nullptr);
ret->SetBucketPageId(0, new_page_id_buc);
// 最后Unpin这两个页面
assert(buffer_pool_manager_->UnpinPage(new_page_id_dir, true));
assert(buffer_pool_manager_->UnpinPage(new_page_id_buc, true));
}

// 从buffer中获取页面
assert(directory_page_id_ != INVALID_PAGE_ID);
Page *page = buffer_pool_manager_->FetchPage(directory_page_id_);
assert(page != nullptr);
ret = reinterpret_cast<HashTableDirectoryPage *>(page->GetData());
return ret;
}

/**
* Fetches the a bucket page from the buffer pool manager using the bucket's page_id.
* 使用pageid从BufferPoolManager中得到一个Page,其GetData就是bucket对象。
* 感觉这个名字不准确,应该是直接得到Page的data,不是Page
*
* @param bucket_page_id the page_id to fetch
* @return a pointer to a bucket page
*/
template <typename KeyType, typename ValueType, typename KeyComparator>
HASH_TABLE_BUCKET_TYPE *HASH_TABLE_TYPE::FetchBucketPage(page_id_t bucket_page_id) {
Page *page = buffer_pool_manager_->FetchPage(bucket_page_id);
assert(page != nullptr);
return reinterpret_cast<HASH_TABLE_BUCKET_TYPE *>(page->GetData());
}

然后是搜索函数,这里并不复杂,一步步找到对应的bucket后,直接调用bucket中写到的GetValue即可。记得Unpin前面用过的页面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// container/hash/extendible_hash_table.cpp
/**
* Performs a point query on the hash table.
*
* @param transaction the current transaction
* @param key the key to look up
* @param[out] result the value(s) associated with a given key
* @return the value(s) associated with the given key
*/
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::GetValue(Transaction *transaction, const KeyType &key, std::vector<ValueType> *result) {
HashTableDirectoryPage *dir_page = FetchDirectoryPage();
page_id_t bucket_page_id = KeyToPageId(key, dir_page);
HASH_TABLE_BUCKET_TYPE *bucket = FetchBucketPage(bucket_page_id);

// 读取数据
bool ret = bucket->GetValue(key, comparator_, result);

// 记得Unpin
assert(buffer_pool_manager_->UnpinPage(bucket_page_id, false));
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));

return ret;
}

3. Insertion

插入函数比较困难,不过主要困难在分裂上。先把不需要分裂的部分搞定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// container/hash/extendible_hash_table.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::Insert(Transaction *transaction, const KeyType &key, const ValueType &value) {
HashTableDirectoryPage *dir_page = FetchDirectoryPage();
page_id_t bucket_page_id = KeyToPageId(key, dir_page);
HASH_TABLE_BUCKET_TYPE *bucket = FetchBucketPage(bucket_page_id);

// 如果bucket没满,直接插入即可
if (!bucket->IsFull()) {
bool ret = bucket->Insert(key, value, comparator_);
assert(buffer_pool_manager_->UnpinPage(bucket_page_id, true));
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));
return ret;
}
// 满了要扩容
assert(buffer_pool_manager_->UnpinPage(bucket_page_id, false));
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));
return SplitInsert(transaction, key, value);
}

扩容的步骤如下:

  1. 检查状态,看能不能扩容。
  2. 根据需要决定是否扩容Directory。
  3. 找到当前bucket,将数据保存下来,然后初始化它。
  4. 创建并初始化一个image bucket,这里将源bucket称为split bucket。
  5. 重新将数据分类并分别插入两个bucket中。
  6. 将所有同一级的bucket设置为相同的local depth和page。
  7. Unpin用过的页面,然后重新尝试调用Insert函数。

这里发现前面的bucket page实现有缺失,故补充了两个函数。这两个函数不复杂,直接看函数名就知道该干什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// storage/page/hash_table_bucket_page.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
MappingType *HASH_TABLE_BUCKET_TYPE::GetArrayCopy() {
uint32_t num = NumReadable();
MappingType *copy = new MappingType[num];
for (uint32_t i = 0, index = 0; i < BUCKET_ARRAY_SIZE; i++) {
if (IsReadable(i)) {
copy[index++] = array_[i];
}
}
return copy;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_BUCKET_TYPE::Reset() {
memset(occupied_, 0, sizeof(occupied_));
memset(readable_, 0, sizeof(readable_));
memset(array_, 0, sizeof(array_));
}

现在可以实现SpiltInsert方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// container/hash/extendible_hash_table.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::SplitInsert(Transaction *transaction, const KeyType &key, const ValueType &value) {
HashTableDirectoryPage *dir_page = FetchDirectoryPage();
int64_t split_bucket_index = KeyToDirectoryIndex(key, dir_page);
uint32_t split_bucket_depth = dir_page->GetLocalDepth(split_bucket_index);

// 容量满了,不能扩了
if (split_bucket_depth >= MAX_BUCKET_DEPTH) {
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));
return false;
}

// 看看Directory需不需要扩容
if (split_bucket_depth == dir_page->GetGlobalDepth()) {
dir_page->IncrGlobalDepth();
}

// 增加local depth
dir_page->IncrLocalDepth(split_bucket_index);

// 获取当前bucket,先将数据保存下来,然后重新初始化它
page_id_t split_bucket_page_id = KeyToPageId(key, dir_page);
HASH_TABLE_BUCKET_TYPE *split_bucket = FetchBucketPage(split_bucket_page_id);
uint32_t origin_array_size = split_bucket->NumReadable();
MappingType *origin_array = split_bucket->GetArrayCopy();
split_bucket->Reset();

// 创建一个image bucket,并初始化该image bucket
page_id_t image_bucket_page_id;
Page *image_bucket_page = buffer_pool_manager_->NewPage(&image_bucket_page_id);
assert(image_bucket_page != nullptr);
HASH_TABLE_BUCKET_TYPE *image_bucket = GetBucketPageData(image_bucket_page);
uint32_t split_image_bucket_index = dir_page->GetSplitImageIndex(split_bucket_index);
dir_page->SetLocalDepth(split_image_bucket_index, dir_page->GetLocalDepth(split_bucket_index));
dir_page->SetBucketPageId(split_image_bucket_index, image_bucket_page_id);

// 重新插入数据
for (uint32_t i = 0; i < origin_array_size; i++) {
MappingType tmp = origin_array[i];
uint32_t target_bucket_index = Hash(tmp.first) & dir_page->GetLocalDepthMask(split_bucket_index);
page_id_t target_bucket_index_page = dir_page->GetBucketPageId(target_bucket_index);
assert(target_bucket_index_page == split_bucket_page_id || target_bucket_index_page == image_bucket_page_id);
// 这里根据新计算的hash结果决定插入哪个bucket
if (target_bucket_index_page == split_bucket_page_id) {
assert(split_bucket->Insert(tmp.first, tmp.second, comparator_));
} else {
assert(image_bucket->Insert(tmp.first, tmp.second, comparator_));
}
}
delete[] origin_array;

// 将所有同一级的bucket设置为相同的local depth和page
uint32_t diff = 1 << dir_page->GetLocalDepth(split_bucket_index);
for (uint32_t i = split_bucket_index; i >= diff; i -= diff) {
dir_page->SetBucketPageId(i, split_bucket_page_id);
dir_page->SetLocalDepth(i, dir_page->GetLocalDepth(split_bucket_index));
}
for (uint32_t i = split_bucket_index; i < dir_page->Size(); i += diff) {
dir_page->SetBucketPageId(i, split_bucket_page_id);
dir_page->SetLocalDepth(i, dir_page->GetLocalDepth(split_bucket_index));
}
for (uint32_t i = split_image_bucket_index; i >= diff; i -= diff) {
dir_page->SetBucketPageId(i, image_bucket_page_id);
dir_page->SetLocalDepth(i, dir_page->GetLocalDepth(split_bucket_index));
}
for (uint32_t i = split_image_bucket_index; i < dir_page->Size(); i += diff) {
dir_page->SetBucketPageId(i, image_bucket_page_id);
dir_page->SetLocalDepth(i, dir_page->GetLocalDepth(split_bucket_index));
}

// Unpin
assert(buffer_pool_manager_->UnpinPage(split_bucket_page_id, true));
assert(buffer_pool_manager_->UnpinPage(image_bucket_page_id, true));
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), true));

// 最后重新尝试插入
return Insert(transaction, key, value);
}

4. Remove

删除部分也不困难,找到Bucket后调用其Remove方法即可。在删除后还需要判断一下是不是当前bucket已经空了,如果是就Merge当前bucket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// container/hash/extendible_hash_table.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::Remove(Transaction *transaction, const KeyType &key, const ValueType &value) {
// 先找到bucket
HashTableDirectoryPage *dir_page = FetchDirectoryPage();
page_id_t bucket_page_id = KeyToPageId(key, dir_page);
uint32_t bucket_index = KeyToDirectoryIndex(key, dir_page);
HASH_TABLE_BUCKET_TYPE *bucket = FetchBucketPage(bucket_page_id);

// 删除Key-value
bool ret = bucket->Remove(key, value, comparator_);

// Unpin
assert(buffer_pool_manager_->UnpinPage(bucket_page_id, true));
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));

// 为空则合并
if (bucket->IsEmpty()) {
Merge(transaction, bucket_index);
}

return ret;
}

5. Merge

Merge函数用于合并两个bucket。由于这是一个私有函数,故为了方便使用我修改了一下函数定义。传入的Bucket_index是要被删除的bucket的index。我们这里删除它,然后把它的指向都修改为其split image即可。
主要过程如下:

  1. 根据注释说明,检查三个条件。若有任意一个不符合,就不进行Merge。
  2. 直接删除该Bucket,通过删除其页来实现。
  3. 遍历整个Directory,将所有指向被删除bucket的指向都修改为指向其split image。
  4. 尝试收缩Directory,调用directory的方法即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// container/hash/extendible_hash_table.cpp
template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_TYPE::Merge(Transaction *transaction, uint32_t target_bucket_index) {
HashTableDirectoryPage *dir_page = FetchDirectoryPage();
page_id_t target_bucket_page_id = dir_page->GetBucketPageId(target_bucket_index);
uint32_t image_bucket_index = dir_page->GetSplitImageIndex(target_bucket_index);

// local depth为0说明已经最小了,不收缩
uint32_t local_depth = dir_page->GetLocalDepth(target_bucket_index);
if (local_depth == 0) {
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));
return;
}

// 如果该bucket与其split image深度不同,也不收缩
if (local_depth != dir_page->GetLocalDepth(image_bucket_index)) {
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));
return;
}

// 如果target bucket不为空,则不收缩
HASH_TABLE_BUCKET_TYPE *target_bucket = FetchBucketPage(target_bucket_page_id);
if (!target_bucket->IsEmpty()) {
assert(buffer_pool_manager_->UnpinPage(target_bucket_page_id, false));
assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), false));
return;
}

// 删除target bucket,此时该bucket已经为空
assert(buffer_pool_manager_->UnpinPage(target_bucket_page_id, false));
assert(buffer_pool_manager_->DeletePage(target_bucket_page_id));

// 设置target bucket的page为split image的page,即合并target和split
page_id_t image_bucket_page_id = dir_page->GetBucketPageId(image_bucket_index);
dir_page->SetBucketPageId(target_bucket_index, image_bucket_page_id);
dir_page->DecrLocalDepth(target_bucket_index);
dir_page->DecrLocalDepth(image_bucket_index);
assert(dir_page->GetLocalDepth(target_bucket_index) == dir_page->GetLocalDepth(image_bucket_index));

// 遍历整个directory,将所有指向target bucket page的bucket全部重新指向split image bucket的page
for (uint32_t i = 0; i < dir_page->Size(); i++) {
if (dir_page->GetBucketPageId(i) == target_bucket_page_id || dir_page->GetBucketPageId(i) == image_bucket_page_id) {
dir_page->SetBucketPageId(i, image_bucket_page_id);
dir_page->SetLocalDepth(i, dir_page->GetLocalDepth(target_bucket_index));
}
}

// 尝试收缩Directory
// 这里要循环,不能只收缩一次
while (dir_page->CanShrink()) {
dir_page->DecrGlobalDepth();
}

assert(buffer_pool_manager_->UnpinPage(dir_page->GetPageId(), true));
}

三、并发控制

1. 修改FetchBucketPage

根据实验说明中的描述,需要对页面单独上锁,故需要拆分FetchBucketPage函数。先获取Page对象指针,再根据指针获取Bucket。

1
2
3
// include/container/hash/extendible_hash_table.h
Page *FetchBucketPage(page_id_t bucket_page_id);
HASH_TABLE_BUCKET_TYPE *GetBucketPageData(Page *page);

修改两个函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// container/hash/extendible_hash_table.cpp
/**
* Fetches the a bucket page from the buffer pool manager using the bucket's page_id.
* 使用pageid从BufferPoolManager中得到一个Page,其GetData就是bucket对象。
* 修改,添加一个函数,修改Fetch函数的签名。将Page与data分开处理
*
* @param bucket_page_id the page_id to fetch
* @return a pointer to a bucket page
*/
template <typename KeyType, typename ValueType, typename KeyComparator>
Page *HASH_TABLE_TYPE::FetchBucketPage(page_id_t bucket_page_id) {
Page *page = buffer_pool_manager_->FetchPage(bucket_page_id);
assert(page != nullptr);
return page;
}

template <typename KeyType, typename ValueType, typename KeyComparator>
HASH_TABLE_BUCKET_TYPE *HASH_TABLE_TYPE::GetBucketPageData(Page *page) {
return reinterpret_cast<HASH_TABLE_BUCKET_TYPE *>(page->GetData());
}

后面调用这两个函数的部分均需要修改。以GetValue中为例:

1
2
3
// container/hash/extendible_hash_table.cpp
Page *bucket_page = FetchBucketPage(bucket_page_id);
HASH_TABLE_BUCKET_TYPE *bucket = GetBucketPageData(bucket_page);

2. 锁

这部分主要是在原来的代码中添加一些锁,故不再大段粘贴代码。修改后的代码可参见Github。

在FetchDirectoryPage中,需要对创建页的过程上锁,避免并发带来的重复创建。
先在类中定义一个锁,然后调用它即可。

此外,前面一些函数的逻辑还要调整一下,主要是为释放锁考虑。

GetValue需要对类和页面上读锁。
Insert需要对类上读锁,对页面上写锁,因为这里只对一个Bucket操作。
SplitInsert需要对类和页面上写锁,因为这里涉及到了Directory的操作。
Remove需要对类上读锁,对页面上写锁,同理,只对一个Bucket操作。
Merge需要对类上写锁,对页面上读锁,这里不需要操作Bucket,只修改Directory。

总结

本实验主要是实现一个使用可扩展哈希实现的索引,在做本实验之前需要研究明白可扩展哈希的原理。

  1. 一个可扩展哈希表有一个Directory,其中存了一些指向Bucket的指针(本实验通过Pageid实现)。Bucket中负责存数据,当Bucket满了之后要对满的Bucket进行分裂。当Bucket为空后要合并Bucket。
  2. 扩展收缩本质上是操作Bucket对应的Page,不同Bucket可以指向同一个Page。扩展时就会让满的Bucket与其Split Image分别指向不同的Page,收缩意味着将两个bucket指向一个Page。所以扩展和收缩过程都是对Bucket中保存的Pageid和Page中保存的数据进行操作。
  3. 我们实现的比较简单,收缩只在其中一个bucket为空时进行,也不会主动扫描寻找可收缩的。这在一定程度上也避免了频繁扩展收缩带来的性能问题。
  4. 关于锁,主要是为了保证多线程安全,本地测试不加锁也能过。

后记:调Lab2的时候才发现Lab1忘记git commit了!=_=….

参考

https://www.inlighting.org/archives/cmu-15-445-notes/