LevelDB SSTable模块

SSTable模块

SSTable(Sorted Strings Table,有序字符串表),在各种存储引擎中得到了广泛的使用,包括 LevelDB、HBase、Cassandra 等。SSTable 会根据 Key 进行排序后保存一系列的 K-V 对,这种方式不仅方便进行范围查找,而且便于对 K-V 对进行更加有效的压缩。

SSTable Format

SSTable 文件由一个个块组成,块中可以保存数据、数据索引、元数据或者元数据索引。整体的文件格式如下图:

如上图,SSTable 文件整体分为 4 个部分:

  • Data Block(数据区域):保存具体的键-值对数据。
  • Meta Block(元数据区域):保存元数据,例如布隆过滤器。
  • Index Block(索引区域):分为数据索引和元数据索引。
    • 数据索引:数据索引块中的键为前一个数据块的最后一个键(即一个数据块中最大的键,因为键是有序排列保存的)与后一个数据块的第一个键(即一个数据块中的最小键)的最短分隔符。
    • 元数据索引:元数据索引块可指示如何查找该布隆过滤器的数据。
  • File Footer(尾部):总大小为48个字节。

BlockHandle

BlockHandle在SSTable中是经常使用的一个结构,其定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// https://github.com/google/leveldb/blob/master/table/format.h

class BlockHandle {
  public:
  // Maximum encoding length of a BlockHandle
  enum { kMaxEncodedLength = 10 + 10 };

  BlockHandle();

  // The offset of the block in the file.
  uint64_t offset() const { return offset_; }
  void set_offset(uint64_t offset) { offset_ = offset; }

  // The size of the stored block
  uint64_t size() const { return size_; }
  void set_size(uint64_t size) { size_ = size; }

  void EncodeTo(std::string* dst) const;
  Status DecodeFrom(Slice* input);

  private:
  uint64_t offset_;
  uint64_t size_;
};

BlockHandler 本质就是封装了 offset 和 size,用于定位某些区域。

Block Format

SSTable 中一个块默认大小为 4 KB,由 4 部分组成:

  • 键-值对数据:即我们保存到 LevelDB 中的多组键-值对。
  • 重启点数据:最后 4 字节为重启点的个数,前边部分为多个重启点,每个重启点实际保存的是偏移量。并且每个重启点固定占据 4 字节的空间。
  • 压缩类型:在 LevelDB 的 SSTable 中有两种压缩类型:
    • kNoCompression:没有压缩。
    • kSnappyCompression:Snappy压缩。
  • 校验数据:4 字节的 CRC 校验字段。

Block读写流程

生成Block

块生成主要在 BlockBuilder 中实现,下面先看看它的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// https://github.com/google/leveldb/blob/master/table/block_builder.h

class BlockBuilder {
  public:
  explicit BlockBuilder(const Options* options);

  BlockBuilder(const BlockBuilder&) = delete;
  BlockBuilder& operator=(const BlockBuilder&) = delete;

  void Reset();

  void Add(const Slice& key, const Slice& value);

  Slice Finish();

  size_t CurrentSizeEstimate() const;

  bool empty() const { return buffer_.empty(); }

  private:
  const Options* options_;
  std::string buffer_;              // Destination buffer
  std::vector<uint32_t> restarts_;  // Restart points
  int counter_;                     // Number of entries emitted since restart
  bool finished_;                   // Has Finish() been called?
  std::string last_key_;
};
  • 成员变量
    • options_:在 BlockBuilder 类构造函数中传入,表示一些配置选项。
    • buffer_:块的内容,所有的键-值对都保存到 buffer_ 中。
    • restarts_:每次开启新的重启点后,会将当前 buffer_ 的数据长度保存到 restarts_ 中,当前 buffer_ 中的数据长度即为每个重启点的偏移量。
    • counter_:开启新的重启点之后加入的键-值对数量,默认保存 16 个键-值对,之后会开启一个新的重启点。
    • finished_:指明是否已经调用了 Finish 方法,BlockBuilder 中的 Add 方法会将数据保存到各个成员变量中,而 Finish 方法会依据成员变量的值生成一个块。
    • last_key_:上一个保存的键,当加入新键时,用来计算和上一个键的共同前缀部分。

介绍完了结构,下面来看具体的生成方法。当需要保存一个键-值对时,需要调用 BlockBuilder 类中的 Add 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// https://github.com/google/leveldb/blob/master/table/block_builder.cc

void BlockBuilder::Add(const Slice& key, const Slice& value) {
  //保存上一个加入的key
  Slice last_key_piece(last_key_);

  assert(!finished_);
  assert(counter_ <= options_->block_restart_interval);
  assert(buffer_.empty()  // No values yet?
         || options_->comparator->Compare(key, last_key_piece) > 0);
  size_t shared = 0;

  //判断counter_是否大于block_restart_interval
  if (counter_ < options_->block_restart_interval) {
      const size_t min_length = std::min(last_key_piece.size(), key.size());
      //计算相同前缀的长度
      while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
          shared++;
      }
  } else {
      //如果键-值对数量超过block_restart_interval,则开启新的重启点,清空计数器
      restarts_.push_back(buffer_.size());
      counter_ = 0;
  }
  const size_t non_shared = key.size() - shared;

  //将共同前缀长度、非共享部分长度、值长度追加到buffer_中
  PutVarint32(&buffer_, shared);
  PutVarint32(&buffer_, non_shared);
  PutVarint32(&buffer_, value.size());

  //将key的非共享数据追加到buffer_中_
  buffer_.append(key.data() + shared, non_shared);
  //将Value数据追加到buffer_中
  buffer_.append(value.data(), value.size());

  //更新状态
  last_key_.resize(shared);
  last_key_.append(key.data() + shared, non_shared);
  assert(Slice(last_key_) == key);
  counter_++;
}

执行流程如下:

1. 判断 counter_ 是否大于 block_restart_interval,如果大于,则开启新的重启点,清空计数器并保存 buffer_ 中数据长度的值(该值即每个重启点的偏移量)压到 restarts_ 数组中。
2. 如果 counter_ 未超出配置的每个重启点可以保存的键-值对数值,则计算当前键和上一次保存键的共同前缀,然后将键-值对按格式保存到 buffer_ 中。
3. 更新状态,将 last_key_ 置为当前保存的 key,并且将 counter_ 加 1。

从上面的代码中可以看出,Add 中将所有的键-值对按格式保存到成员变量 buffer_ 中。实际生成 Block 的其实是 Finish 。代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// https://github.com/google/leveldb/blob/master/table/block_builder.cc

Slice BlockBuilder::Finish() {
  //将重启点偏移量写入buffer_中
  for (size_t i = 0; i < restarts_.size(); i++) {
      PutFixed32(&buffer_, restarts_[i]);
  }
  PutFixed32(&buffer_, restarts_.size());
  finished_ = true;
  return Slice(buffer_);
}

Finish 首先将所有重启点偏移量的值依次以 4 字节大小追加到 buffer_ 字符串,最后将重启点个数继续以 4 字节大小追加到 buffer_ 后部,此时返回的结果就是一个完整的 Block。

读取Block

读取 Block 由 Block 类实现,其主要依靠 NewIterator 生成一个 Block 迭代器,再借助迭代器的 Seek 来查找对应的 Key。首先来看看 Block 的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// https://github.com/google/leveldb/blob/master/table/block.h

class Block {
  public:
  // Initialize the block with the specified contents.
  explicit Block(const BlockContents& contents);

  Block(const Block&) = delete;
  Block& operator=(const Block&) = delete;

  ~Block();

  size_t size() const { return size_; }
  Iterator* NewIterator(const Comparator* comparator);

  private:
  class Iter;

  uint32_t NumRestarts() const;

  const char* data_;
  size_t size_;
  uint32_t restart_offset_;  // Offset in data_ of restart array
  bool owned_;               // Block owns data_[]
};

读取一个块通过在 NewIterator 中生成一个迭代器来实现,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// https://github.com/google/leveldb/blob/master/table/block.cc

Iterator* Block::NewIterator(const Comparator* comparator) {
  if (size_ < sizeof(uint32_t)) {
      return NewErrorIterator(Status::Corruption("bad block contents"));
  }
  const uint32_t num_restarts = NumRestarts();
  if (num_restarts == 0) {
      return NewEmptyIterator();
  } else {
      return new Iter(comparator, data_, restart_offset_, num_restarts);
  }
}

这里的逻辑比较简单,就不多作介绍了。

核心的查找逻辑主要是迭代器中的 Seek 下面直接看代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// https://github.com/google/leveldb/blob/master/table/block.cc

void Seek(const Slice& target) override {
  uint32_t left = 0;
  uint32_t right = num_restarts_ - 1;
  int current_key_compare = 0;

  if (Valid()) {
      current_key_compare = Compare(key_, target);
      if (current_key_compare < 0) {
          left = restart_index_;
      } else if (current_key_compare > 0) {
          right = restart_index_;
      } else {
          return;
      }
  }

  //通过重启点进行二分查找
  while (left < right) {
      uint32_t mid = (left + right + 1) / 2;
      uint32_t region_offset = GetRestartPoint(mid);
      uint32_t shared, non_shared, value_length;
      const char* key_ptr =
          DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
                      &non_shared, &value_length);
      if (key_ptr == nullptr || (shared != 0)) {
          CorruptionError();
          return;
      }
      Slice mid_key(key_ptr, non_shared);
      //如果key小于target,则将left置为mid
      if (Compare(mid_key, target) < 0) {
          left = mid;

          //如果key大于等于target,则将right置为mid-1
      } else {
          right = mid - 1;
      }
  }

  assert(current_key_compare == 0 || Valid());
  //在块中线性查找,依次遍历每一个K-V对,将key与target对比,直到找到第一个大于等于target的后返
  bool skip_seek = left == restart_index_ && current_key_compare < 0;
  if (!skip_seek) {
      SeekToRestartPoint(left);
  }

  while (true) {
      if (!ParseNextKey()) {
          return;
      }
      if (Compare(key_, target) >= 0) {
          return;
      }
  }
}

查找逻辑如下:

1. 对重启点数组进行二分查找,找到可能包含数据的重启点。
2. 在块中线性查找,依次遍历每一个 K-V 对,将 key 与 target 对比。
3. 找到第一个 key 大于等于 target 的后将该 K-V 对存储后返回。

SSTable读写

生成SSTable

SSTable 的生成主要在 TableBuilder 中实现,下面先看看它的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// https://github.com/google/leveldb/blob/master/include/leveldb/table_builder.h

class LEVELDB_EXPORT TableBuilder {
  public:
  TableBuilder(const Options& options, WritableFile* file);

  TableBuilder(const TableBuilder&) = delete;
  TableBuilder& operator=(const TableBuilder&) = delete;

  ~TableBuilder();

  Status ChangeOptions(const Options& options);

  void Add(const Slice& key, const Slice& value);

  void Flush();

  Status status() const;

  Status Finish();

  void Abandon();

  uint64_t NumEntries() const;

  uint64_t FileSize() const;

  private:
  bool ok() const { return status().ok(); }
  void WriteBlock(BlockBuilder* block, BlockHandle* handle);
  void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);

  struct Rep;
  Rep* rep_;
};

在介绍该 TableBuilder 的核心逻辑之前,首先我们要看看里面的一个结构体 Rep。其结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct TableBuilder::Rep {
  Rep(const Options& opt, WritableFile* f)
      : options(opt),
  index_block_options(opt),
  file(f),
  offset(0),
  data_block(&options),
  index_block(&index_block_options),
  num_entries(0),
  closed(false),
  filter_block(opt.filter_policy == nullptr
               ? nullptr
               : new FilterBlockBuilder(opt.filter_policy)),
  pending_index_entry(false) {
      index_block_options.block_restart_interval = 1;
  }

  Options options;
  Options index_block_options;
  WritableFile* file;
  uint64_t offset;
  Status status;
  BlockBuilder data_block;
  BlockBuilder index_block;
  std::string last_key;
  int64_t num_entries;
  bool closed;  // Either Finish() or Abandon() has been called.
  FilterBlockBuilder* filter_block;
  bool pending_index_entry;
  BlockHandle pending_handle;  // Handle to add to index block

  std::string compressed_output;
};

我们需要注意的关键变量如下:

  • file:SSTable 生成的文件。
  • data_block:用于生成 SSTable 的数据区域。
  • index_block:用于生成 SSTable 的索引区域。
  • pending_index_entry:决定是否需要写数据索引。
  • **pending_handle:**写数据索引的方法。SSTable中每次完整写入一个块后需要生成该块的索引,索引中的键是当前块最大键与即将插入的键的最短分隔符,例如一个块中最大键为 abceg,即将插入的键为 abcqddh,则二者之间的最小分隔符为 abcf。

了解完结构后,接着就看看生成 SSTable 的核心函数 AddFinish

首先来看 Add,其代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// https://github.com/google/leveldb/blob/master/table/table_builder.cc

void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->num_entries > 0) {
      assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }

  //判断是否需要写入数据索引块中
  if (r->pending_index_entry) {
      assert(r->data_block.empty());

      //找到最短分隔符,即大于等于上一个块最大的键,小于下一个块最小的键
      r->options.comparator->FindShortestSeparator(&r->last_key, key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      //在数据索引块中写入key和BlockHandle
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
  }

  //写入元数据块中
  if (r->filter_block != nullptr) {
      r->filter_block->AddKey(key);
  }

  //修改last_key为当前要插入的key
  r->last_key.assign(key.data(), key.size());
  r->num_entries++;

  //写入数据块中
  r->data_block.Add(key, value);

  //判断数据块大小是否大于配置的块大小,如果大于则调用Flush写入SSTable文件并刷新到硬盘
  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
      Flush();
  }
}

Add 主要就是调用生成数据块与数据索引块的方法 BlockBuilder::Add 以及生成元数据块的方法 FilterBlockBuilder::Add 依次将键值对加入数据索引块、元数据块以及数据块。

可以看到,最后会判断数据块大小是否大于配置的块大小,如果大于则调用 Flush 写入 SSTable 文件并刷新到硬盘中。我们接着来看看 Flush 的执行逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// https://github.com/google/leveldb/blob/master/table/table_builder.cc

void TableBuilder::Flush() {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->data_block.empty()) return;
  assert(!r->pending_index_entry);

  //写入数据块
  WriteBlock(&r->data_block, &r->pending_handle);
  if (ok()) {
      //将pending_index_entry修改为true,表明下一次将写入数据索引块。
      r->pending_index_entry = true;
      //将文件刷新到磁盘	
      r->status = r->file->Flush();
  }
  if (r->filter_block != nullptr) {
      r->filter_block->StartBlock(r->offset);
  }
}

执行逻辑如下:

1. 将数据写入数据块中。
2. 将 pending_index_entry 修改为 true,表明下一次调用 `Add` 时将写入数据索引块。
3. 将文件刷新到磁盘中。

介绍完了 Add,下面来看看 Finish

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// https://github.com/google/leveldb/blob/master/table/table_builder.cc

Status TableBuilder::Finish() {
  Rep* r = rep_;
  //写入SSTable文件并刷新到硬盘中
  Flush();
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  //写入元数据块
  if (ok() && r->filter_block != nullptr) {
      WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                    &filter_block_handle);
  }

  //写入元数据块索引
  if (ok()) {
      BlockBuilder meta_index_block(&r->options);
      if (r->filter_block != nullptr) {
          std::string key = "filter.";
          key.append(r->options.filter_policy->Name());
          std::string handle_encoding;
          filter_block_handle.EncodeTo(&handle_encoding);
          meta_index_block.Add(key, handle_encoding);
      }

      WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  //写入数据块索引
  if (ok()) {
      if (r->pending_index_entry) {
          r->options.comparator->FindShortSuccessor(&r->last_key);
          std::string handle_encoding;
          r->pending_handle.EncodeTo(&handle_encoding);
          r->index_block.Add(r->last_key, Slice(handle_encoding));
          r->pending_index_entry = false;
      }
      WriteBlock(&r->index_block, &index_block_handle);
  }

  //写入尾部
  if (ok()) {
      Footer footer;
      footer.set_metaindex_handle(metaindex_block_handle);
      footer.set_index_handle(index_block_handle);
      std::string footer_encoding;
      footer.EncodeTo(&footer_encoding);
      r->status = r->file->Append(footer_encoding);
      if (r->status.ok()) {
          r->offset += footer_encoding.size();
      }
  }
  return r->status;
}

Finish 会按照我们一开始给出的 SSTable 的格式,将数据分别写入数据块、数据块索引、元数据块、元数据块索引、尾部。最后生成 SSTable。

读取SSTable

介绍完了写入,我们再来看看读取。SSTable 读取的逻辑与 Block 类似,都是借助于迭代器实现的。主要实现代码在 Table 类中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// https://github.com/google/leveldb/blob/master/include/leveldb/table.h

class LEVELDB_EXPORT Table {
  public:
  static Status Open(const Options& options, RandomAccessFile* file,
                     uint64_t file_size, Table** table);

  Table(const Table&) = delete;
  Table& operator=(const Table&) = delete;

  ~Table();

  Iterator* NewIterator(const ReadOptions&) const;

  uint64_t ApproximateOffsetOf(const Slice& key) const;

  private:
  friend class TableCache;
  struct Rep;

  static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);

  explicit Table(Rep* rep) : rep_(rep) {}

  Status InternalGet(const ReadOptions&, const Slice& key, void* arg,
                     void (*handle_result)(void* arg, const Slice& k,
                                           const Slice& v));

  void ReadMeta(const Footer& footer);
  void ReadFilter(const Slice& filter_handle_value);

  Rep* const rep_;
};

这里我们主要关注生成迭代器的 NewIterator 和 第二层生成迭代器的 BlockReader

首先看 NewIterator 的代码:

1
2
3
4
5
6
7
// https://github.com/google/leveldb/blob/master/table/table.cc

Iterator* Table::NewIterator(const ReadOptions& options) const {
  return NewTwoLevelIterator(
      rep_->index_block->NewIterator(rep_->options.comparator),
      &Table::BlockReader, const_cast<Table*>(this), options);
}

这里返回了一个双层迭代器 NewTwoLevelIterator。第一层为数据索引块的迭代器,即 rep_->index_block->NewIterator。通过第一层的数据索引块迭代器查找一个键应该属于的块,然后通过第二层迭代器去读取这个块并查找该键。

接着看生成第二层块迭代器的 BlockReader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// https://github.com/google/leveldb/blob/master/table/table.cc

Iterator* Table::BlockReader(void* arg, const ReadOptions& options,
                           const Slice& index_value) {
  Table* table = reinterpret_cast<Table*>(arg);
  Cache* block_cache = table->rep_->options.block_cache;
  Block* block = nullptr;
  Cache::Handle* cache_handle = nullptr;

  BlockHandle handle;
  Slice input = index_value;
  Status s = handle.DecodeFrom(&input);

  if (s.ok()) {
      //contents保存一个块的内容
      BlockContents contents;
      if (block_cache != nullptr) {
          char cache_key_buffer[16];
          EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
          EncodeFixed64(cache_key_buffer + 8, handle.offset());
          Slice key(cache_key_buffer, sizeof(cache_key_buffer));
          cache_handle = block_cache->Lookup(key);

          if (cache_handle != nullptr) {
              block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
          } else {
              //通过BlockHandle存储的偏移量和大小读取一个块的数据到contents
              s = ReadBlock(table->rep_->file, options, handle, &contents);
              if (s.ok()) {
                  //根据contents生成一个Block结构
                  block = new Block(contents);
                  if (contents.cachable && options.fill_cache) {
                      cache_handle = block_cache->Insert(key, block, block->size(),
                                                         &DeleteCachedBlock);
                  }
              }
          }
      } else {
          s = ReadBlock(table->rep_->file, options, handle, &contents);
          if (s.ok()) {
              block = new Block(contents);
          }
      }
  }

  Iterator* iter;
  if (block != nullptr) {
      //生成块迭代器,通过该迭代器读取数据
      iter = block->NewIterator(table->rep_->options.comparator);
      if (cache_handle == nullptr) {
          iter->RegisterCleanup(&DeleteBlock, block, nullptr);
      } else {
          iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
      }
  } else {
      iter = NewErrorIterator(s);
  }
  return iter;
}

SSTable 的读取先通过第一层迭代器(即数据索引)获取到一个键需要查找的块位置,读取该块的内容并且构造第二层迭代器遍历该块,通过两层迭代器即可在 SSTable 中进行键的查找。

布隆过滤器

如果在 LevelDB 中查找某个不存在的键,必须先检查内存表 MemTable,然后逐层查找,为了优化这种读取,LevelDB 中会使用布隆过滤器。当布隆过滤器判定键不存在时,可以直接返回,无须继续查找。

这里就不过多介绍原理,如果想了解可以看看我的往期博客 海量数据处理(一) :位图与布隆过滤器的概念以及实现

实现

布隆过滤器继承了 LevelDB 中抽象出的过滤器纯虚类 FilterPolicy,其结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// https://github.com/google/leveldb/blob/master/include/leveldb/filter_policy.h

class LEVELDB_EXPORT FilterPolicy {
  public:
  virtual ~FilterPolicy();

  virtual const char* Name() const = 0;

  virtual void CreateFilter(const Slice* keys, int n,
                            std::string* dst) const = 0;

  virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const = 0;
};

FilterPolicy 定义了几个接口:

  • Name:返回过滤器名称。
  • CreateFilter:将一个字符串加入过滤器中。
  • KeyMayMatch:通过过滤器内容判断一个元素是否存在。

接下来我们看看 BloomFilterPolicy 是如何实现这些接口的,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// https://github.com/google/leveldb/blob/master/util/bloom.cc

class BloomFilterPolicy : public FilterPolicy {
  public:
  explicit BloomFilterPolicy(int bits_per_key) : bits_per_key_(bits_per_key) {
      k_ = static_cast<size_t>(bits_per_key * 0.69);  // 0.69 =~ ln(2)
      if (k_ < 1) k_ = 1;
      if (k_ > 30) k_ = 30;
  }

  const char* Name() const override { return "leveldb.BuiltinBloomFilter2"; }

  void CreateFilter(const Slice* keys, int n, std::string* dst) const override {
      size_t bits = n * bits_per_key_;

      if (bits < 64) bits = 64;

      size_t bytes = (bits + 7) / 8;
      bits = bytes * 8;

      const size_t init_size = dst->size();
      dst->resize(init_size + bytes, 0);
      dst->push_back(static_cast<char>(k_));  // Remember # of probes in filter
      char* array = &(*dst)[init_size];
      //依次处理每一个Key
      for (int i = 0; i < n; i++) {
          uint32_t h = BloomHash(keys[i]);
          //通过位运算获取delta
          const uint32_t delta = (h >> 17) | (h << 15);  // Rotate right 17 bits

          //计算哈希值,对每一个key计算k次哈希值(这里采用对每次计算出的的哈希值增加delta来模拟)
          for (size_t j = 0; j < k_; j++) {
              const uint32_t bitpos = h % bits;
              array[bitpos / 8] |= (1 << (bitpos % 8));
              h += delta;
          }
      }
  }

  bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override {
      const size_t len = bloom_filter.size();
      if (len < 2) return false;

      const char* array = bloom_filter.data();
      const size_t bits = (len - 1) * 8;
      const size_t k = array[len - 1];
      if (k > 30) {
          return true;
      }

      //计算出key的哈希
      uint32_t h = BloomHash(key);
      const uint32_t delta = (h >> 17) | (h << 15);  // Rotate right 17 bits
      //判断对应位置是否全为1,如果有任何一个为0说明数据不可能存在布隆过滤器中,返回false,否则true
      for (size_t j = 0; j < k; j++) {
          const uint32_t bitpos = h % bits;
          if ((array[bitpos / 8] & (1 << (bitpos % 8))) == 0) return false;
          h += delta;
      }
      return true;
  }

  private:
  size_t bits_per_key_;
  size_t k_;
};

具体逻辑都标识在了注释中,就不多说了,这里提一下这里用到的一个哈希小技巧:

  • 为了避免哈希冲突,大部分布隆过滤器中都会采用多种哈希算法来计算。LevelDB 为了简化规则,使用位运算 计算出 delta,对每轮计算出的哈希值累加上 delta,模拟多轮哈希计算。

应用

LevelDB 中具体使用布隆过滤器时又封装了两个类,分别为 FilterBlockBuilder 和 FilterBlockReader。

FilterBlockBuilder 的主要功能是通过调用 BloomFilterPolicy 的 CreateFilter 方法生成布隆过滤器,并且将布隆过滤器的内容写入 SSTable 的元数据块。其结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// https://github.com/google/leveldb/blob/master/table/filter_block.h

class FilterBlockBuilder {
  public:
  explicit FilterBlockBuilder(const FilterPolicy*);

  FilterBlockBuilder(const FilterBlockBuilder&) = delete;
  FilterBlockBuilder& operator=(const FilterBlockBuilder&) = delete;

  void StartBlock(uint64_t block_offset);
  void AddKey(const Slice& key);
  Slice Finish();

  private:
  void GenerateFilter();

  const FilterPolicy* policy_;
  std::string keys_;             // Flattened key contents
  std::vector<size_t> start_;    // Starting index in keys_ of each key
  std::string result_;           // Filter data computed so far
  std::vector<Slice> tmp_keys_;  // policy_->CreateFilter() argument
  std::vector<uint32_t> filter_offsets_;
};
  • 成员变量
    • policy_:实现了 FilterPolicy 接口的类,在布隆过滤器中为 BloomFilterPolicy。
    • keys_:生成布隆过滤器的键。
    • start_:数组类型,保存 keys_ 参数中每一个键的开始索引。
    • result_:保存生成的布隆过滤器内容。
    • tmp_keys_:生成布隆过滤器时,会通过 keys_ 和 start_ 拆分出每一个键,将拆分出的每一个键保存到 tmp_keys_ 数组中。
    • filter_offsets_:过滤器偏移量,即每一个过滤器在元数据块中的偏移量。

写入的核心逻辑主要在 AddKeyFinish 中,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// https://github.com/google/leveldb/blob/master/table/filter_block.cc

void FilterBlockBuilder::AddKey(const Slice& key) {
  Slice k = key;
  //记录key的索引位置
  start_.push_back(keys_.size());
  //将key追加到该字符串中
  keys_.append(k.data(), k.size());
}

Slice FilterBlockBuilder::Finish() {
  //写入内容
  if (!start_.empty()) {
      GenerateFilter();
  }

  //写入偏移量
  const uint32_t array_offset = result_.size();
  for (size_t i = 0; i < filter_offsets_.size(); i++) {
      PutFixed32(&result_, filter_offsets_[i]);
  }
  //写入总大小
  PutFixed32(&result_, array_offset);
  //写入基数
  result_.push_back(kFilterBaseLg);  // Save encoding parameter in result
  return Slice(result_);
}

首先调用 AddKey 记录 Key 的索引位置,并将 Key 追加到 Keys_中。接着,调用 Finish 生成一个元数据块,并分别写入布隆过滤器的内容、偏移量、内容总大小和基数。

FilterBlockReader 用于查找一个元素是否在一个布隆过滤器中,其结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// https://github.com/google/leveldb/blob/master/table/filter_block.h

class FilterBlockReader {
  public:
  // REQUIRES: "contents" and *policy must stay live while *this is live.
  FilterBlockReader(const FilterPolicy* policy, const Slice& contents);
  bool KeyMayMatch(uint64_t block_offset, const Slice& key);

  private:
  const FilterPolicy* policy_;
  const char* data_;    // Pointer to filter data (at block-start)
  const char* offset_;  // Pointer to beginning of offset array (at block-end)
  size_t num_;          // Number of entries in offset array
  size_t base_lg_;      // Encoding parameter (see kFilterBaseLg in .cc file)
};
  • 成员变量
    • policy_:实现了 FilterPolicy 接口的类,在布隆过滤器中为 BloomFilterPolicy。
    • data_:指向元数据块的开始位置。
    • offset_:指向元数据块中过滤器偏移量的开始位置。
    • num_:过滤器偏移量的个数。
    • base_lg_:过滤器基数。

FilterBlockReader 中的 KeyMayMatch 根据数据块偏移量找到对应的过滤器内容,然后调用 BloomFilterPolicy 中的 KeyMayMatch 方法判断一个元素是否在该过滤器之中。代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// https://github.com/google/leveldb/blob/master/table/filter_block.cc

bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, const Slice& key) {
  uint64_t index = block_offset >> base_lg_;
  if (index < num_) {
      uint32_t start = DecodeFixed32(offset_ + index * 4);
      uint32_t limit = DecodeFixed32(offset_ + index * 4 + 4);
      if (start <= limit && limit <= static_cast<size_t>(offset_ - data_)) {
          Slice filter = Slice(data_ + start, limit - start);
          return policy_->KeyMayMatch(key, filter);
      } else if (start == limit) {
          // Empty filters do not match any keys
          return false;
      }
  }

LRU Cache

我们希望经常使用的 SSTable 内容尽量保存在内存中,但如果磁盘中的 SSTable 文件的总大小大于服务器内存大小,或者需要控制 LevelDB 的内存总占用量时,就需要使用 LRU(least recentlyused)Cache 来管理内存。LRU 是一种缓存置换策略,根据该策略不仅可以管理内存的占用量,还可以将热数据尽量保存到内存中,以加快读取速度。

内存是有限并且昂贵的资源,因此 LevelDB 通过 LRU 策略管理读取到内存的数据。LRU 基于这样一种假设:如果一个资源最近没有或者很少被使用到,那么将来也会很少甚至不被使用。因此如果内存不足,需要淘汰数据时,可以根据 LRU 策略来执行。

这里就不过多介绍原理,如果想了解可以看看我的往期博客 高级数据结构与算法 | LRU缓存机制(Least Recently Used)

结构

为了方便拓展其他的缓存置换算法(目前仅实现了 LRU),LevelDB 抽象出了一个 Cache 纯虚类,结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// https://github.com/google/leveldb/blob/master/include/leveldb/cache.h

class LEVELDB_EXPORT Cache {
  public:
  Cache() = default;

  Cache(const Cache&) = delete;
  Cache& operator=(const Cache&) = delete;

  virtual ~Cache();

  struct Handle {};

  virtual Handle* Insert(const Slice& key, void* value, size_t charge,
                         void (*deleter)(const Slice& key, void* value)) = 0;

  virtual Handle* Lookup(const Slice& key) = 0;

  virtual void Release(Handle* handle) = 0;

  virtual void* Value(Handle* handle) = 0;

  virtual void Erase(const Slice& key) = 0;

  virtual uint64_t NewId() = 0;

  virtual void Prune() {}

  virtual size_t TotalCharge() const = 0;

  private:
  void LRU_Remove(Handle* e);
  void LRU_Append(Handle* e);
  void Unref(Handle* e);

  struct Rep;
  Rep* rep_;
};

LevelDB 中 LRU 由 LRUCache 实现,并且 LRU 节点由 LRUHandle 实现。首先我们来看看 LRUHandle:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// https://github.com/google/leveldb/blob/master/util/cache.cc

struct LRUHandle {
  void* value;
  void (*deleter)(const Slice&, void* value);
  LRUHandle* next_hash;
  LRUHandle* next;	
  LRUHandle* prev;
  size_t charge;  // TODO(opt): Only allow uint32_t?
  size_t key_length;
  bool in_cache;     // Whether entry is in the cache.
  uint32_t refs;     // References, including cache reference, if present.
  uint32_t hash;     // Hash of key(); used for fast sharding and comparisons
  char key_data[1];  // Beginning of key

  Slice key() const {
      assert(next != this);

      return Slice(key_data, key_length);
  }
};
  • 成员变量
    • value:具体的值,指针类型。
    • deleter:自定义回收节点的回调函数。
    • next_hash:用于 hashtable 冲突时,下一个节点。
    • next:代表 LRU 中双向链表中下一个节点。
    • prev:代表 LRU 中双向链表中上一个节点。
    • charge:记录当前 value 所占用的内存大小,用于后面超出容量后需要进行 lru。
    • key_length:数据 key 的长度。
    • in_cache:表示是否在缓存中。
    • refs:引用计数,因为当前节点可能会被多个组件使用,不能简单的删除。
    • hash:记录当前 key 的 hash 值。

这个节点设计的非常巧妙,既可以用来当做 LRU 节点,也可以用来当作 hashtable 的节点。

接着我们来看看 LRUCache 的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// https://github.com/google/leveldb/blob/master/util/cache.cc

class LRUCache {
  public:
  LRUCache();
  ~LRUCache();

  void SetCapacity(size_t capacity) { capacity_ = capacity; }

  Cache::Handle* Insert(const Slice& key, uint32_t hash, void* value,
                        size_t charge,
                        void (*deleter)(const Slice& key, void* value));
  Cache::Handle* Lookup(const Slice& key, uint32_t hash);
  void Release(Cache::Handle* handle);
  void Erase(const Slice& key, uint32_t hash);
  void Prune();
  size_t TotalCharge() const {
      MutexLock l(&mutex_);
      return usage_;
  }

  private:
  void LRU_Remove(LRUHandle* e);
  void LRU_Append(LRUHandle* list, LRUHandle* e);
  void Ref(LRUHandle* e);
  void Unref(LRUHandle* e);
  bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_);

  size_t capacity_;

  mutable port::Mutex mutex_;
  size_t usage_ GUARDED_BY(mutex_);

  LRUHandle lru_ GUARDED_BY(mutex_);

  LRUHandle in_use_ GUARDED_BY(mutex_);

  HandleTable table_ GUARDED_BY(mutex_);
};

实现

LRU Cache 的实现有如下两处细节需要注意:

  • 减小锁的粒度:LRUCache 的并发操作不安全,因此操作时需要加锁。为了减小锁的粒度,LevelDB 中通过哈希将键分为 16 个段,可以理解为有 16 个相同的 LRU Cache 结构,每次进行 Cache 操作时需要先去查找键属于的段。
  • 缓存淘汰:每个LRU Cache结构中有两个成员变量:lru_ 和 in_use_ 。lru_ 双向链表中的节点是可以进行淘汰的,而 in_use_ 双向链表中的节点表示正在使用,因此不可以进行淘汰。

减小锁的粒度

LevelDB 为了减小锁的粒度,封装了一个 ShardedLRUCache,其中有一个大小为 16(默认值) 的 shard_ 成员,shard_ 成员的每个元素均为一个 LRUCache 结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// ShardedLRUCache
class ShardedLRUCache : public Cache {
  private:
  LRUCache shard_[kNumShards];
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
      return Hash(s.data(), s.size(), 0);
  }

  static uint32_t Shard(uint32_t hash) { return hash >> (32 - kNumShardBits); }

  public:
  explicit ShardedLRUCache(size_t capacity) : last_id_(0) {
      const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
      for (int s = 0; s < kNumShards; s++) {
          shard_[s].SetCapacity(per_shard);
      }
  }
  ~ShardedLRUCache() override {}
  Handle* Insert(const Slice& key, void* value, size_t charge,
                 void (*deleter)(const Slice& key, void* value)) override {
      const uint32_t hash = HashSlice(key);
      return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
  }
  Handle* Lookup(const Slice& key) override {
      const uint32_t hash = HashSlice(key);
      return shard_[Shard(hash)].Lookup(key, hash);
  }
  void Release(Handle* handle) override {
      LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
      shard_[Shard(h->hash)].Release(handle);
  }
  void Erase(const Slice& key) override {
      const uint32_t hash = HashSlice(key);
      shard_[Shard(hash)].Erase(key, hash);
  }
  void* Value(Handle* handle) override {
      return reinterpret_cast<LRUHandle*>(handle)->value;
  }
  uint64_t NewId() override {
      MutexLock l(&id_mutex_);
      return ++(last_id_);
  }
  void Prune() override {
      for (int s = 0; s < kNumShards; s++) {
          shard_[s].Prune();
      }
  }
  size_t TotalCharge() const override {
      size_t total = 0;
      for (int s = 0; s < kNumShards; s++) {
          total += shard_[s].TotalCharge();
      }
      return total;
  }
};

为了能使哈希值刚好能够映射 shard_ 数组,其通过位操作再次对哈希值进行处理,如下代码:

1
2
3
// https://github.com/google/leveldb/blob/master/util/cache.cc

static uint32_t Shard(uint32_t hash) { return hash >> (32 - kNumShardBits); }

其将哈希值右移 28 位,只取最高的 4 位,这样就能够保证处理过的哈希值刚好小于 16。

缓存淘汰

每个LRU Cache结构中有两个成员变量:lru_ 和 in_use_ 。lru_ 双向链表中的节点是可以进行淘汰的,而 in_use_ 双向链表中的节点表示正在使用,因此不可以进行淘汰。如果内存超出限制需要淘汰一个节点时,LevelDB 会将 lru_ 链表中的节点逐个淘汰。

那我们如何决定节点放置的规则呢?其采用如下规则:如果一个缓存节点的 in_cache 为 true,并且 refs 等于 1,则放置到 lru_ 中;如果 in_cache 为 true,并且 refs 大于等于 2,则放置到 in_use_ 中。

  • 对于 in_cache,其会在以下情况变为 false:
    • 删除该节点后。
    • 调用 LRUCache 的析构函数时,会将所有节点的 in_cache 置为 false。
    • 插入一个节点时,如果已经存在一个键值相同的节点,则旧节点的 in_cache 会置为 false。
  • 对于 refs,其变化规则如下:
    • 每次调用 Ref 函数,会将 refs 变量加 1,调用 Unref 函数,会将 refs 变量减 1。
    • 插入一个节点时,该节点会放到 in_use_ 链表中,并且初始的引用计数为 2,不再使用该节点时将引用计数减 1,如果此时节点也不再被其他地方引用,那么引用计数为 1,将其放到 lru_ 链表中。
    • 查找一个节点时,如果查找成功,则调用 Ref,将该节点的引用计数加 1,如果引用计数大于等于 2,会将节点放到 in_use_ 链表中,同理,不再使用该节点时将引用计数减 1,如果此时节点也不再被其他地方引用,那么引用计数为 1,将其放到 lru_ 链表中。

RefUnref 代码如下:

1
2
3
4
5
6
7
8
9
// https://github.com/google/leveldb/blob/master/util/cache.cc

void LRUCache::Ref(LRUHandle* e) {
if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.
  LRU_Remove(e);
  LRU_Append(&in_use_, e);
}
e->refs++;
}

如果缓存节点在 lru_ 链表中(refs 为 1,in_cache 为 true),则首先从 lru_ 链表中删除该节点,然后将节点放到 in_use_ 链表中。最后将节点的 refs 变量加 1。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// https://github.com/google/leveldb/blob/master/util/cache.cc

void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs == 0) {  // Deallocate.
      assert(!e->in_cache);
      (*e->deleter)(e->key(), e->value);
      free(e);
  } else if (e->in_cache && e->refs == 1) {
      // No longer in use; move to lru_ list.
      LRU_Remove(e);
      LRU_Append(&lru_, e);
  }
}

Unref 首先将节点的 refs 变量减 1,然后判断如果 refs 已经等于 0,则删除并释放该节点,否则,如果 refs 变量等于 1 并且 in_cache 为 true,则将该节点从 in_use_ 链表中删除并且移动到 lru_ 链表中。如果内存超出限制需要淘汰一个节点时,LevelDB 会将 lru_ 链表中的节点逐个淘汰。

应用

LevelDB 中 Cache 缓存的主要是 SSTable,即缓存节点的键为 8 字节的文件序号,值为一个包含了 SSTable 实例的结构。其主要逻辑由 TableCache 实现,首先我们先来看看它的结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// https://github.com/google/leveldb/blob/master/db/table_cache.h

class TableCache {
  public:
  TableCache(const std::string& dbname, const Options& options, int entries);
  ~TableCache();

  Iterator* NewIterator(const ReadOptions& options, uint64_t file_number,
                        uint64_t file_size, Table** tableptr = nullptr);

  Status Get(const ReadOptions& options, uint64_t file_number,
             uint64_t file_size, const Slice& k, void* arg,
             void (*handle_result)(void*, const Slice&, const Slice&));

  void Evict(uint64_t file_number);

  private:
  Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);

  Env* const env_;
  const std::string dbname_;
  const Options& options_;
  Cache* cache_;
};
  • 成员变量

    • env_:用于读取 SSTable 文件。
    • dbname_:SSTable 名字。
    • options_:Cache 参数配置。
    • cache_:缓存基类句柄。

这里的核心逻辑 FindTable 方法会使用文件序号(file_number)作为键,并且在 cache_ 中查找是否存在该键,如果不存在,则需要打开一个 SSTable,经过处理之后作为值插入 cache_ 中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
                           Cache::Handle** handle) {
  Status s;
  char buf[sizeof(file_number)];
  EncodeFixed64(buf, file_number);
  Slice key(buf, sizeof(buf));
  //在缓存中查找key是否存在
  *handle = cache_->Lookup(key);

  //如果不存在,则打开一个SSTable文件
  if (*handle == nullptr) {
      //生成fname和RandomAccessFile实例
      std::string fname = TableFileName(dbname_, file_number);
      RandomAccessFile* file = nullptr;
      Table* table = nullptr;
      s = env_->NewRandomAccessFile(fname, &file);
      if (!s.ok()) {
          std::string old_fname = SSTTableFileName(dbname_, file_number);
          if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
              s = Status::OK();
          }
      }

      //打开SSTable文件并且生成一个Table实例,并保存到table变量中。
      if (s.ok()) {
          s = Table::Open(options_, file, file_size, &table);
      }

      if (!s.ok()) {
          assert(table == nullptr);
          delete file;
      } else {
          TableAndFile* tf = new TableAndFile;
          tf->file = file;
          tf->table = table;
          //以文件序号为key,TableAndFile为Value,插入缓存中。
          *handle = cache_->Insert(key, tf, 1, &DeleteEntry);
      }
  }
  return s;
}

SSTable 在 Cache 中缓存时的键为文件序列号,值为一个 TableAndFile 实例,该实例中包括两个成员变量,分别为 file 和 table,查找时通过保存在 table 变量中的 Table 实例迭代器进行查找。

Licensed under CC BY-NC-SA 4.0
最后更新于 May 23, 2022 23:57 CST
Built with Hugo
主题 StackJimmy 设计