1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
// https://github.com/google/leveldb/blob/master/db/db_impl.cc
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // null if options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s;
}
};
mutex_.AssertHeld();
//打开log文件
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
//创建log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : nullptr);
log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long)log_number);
//读取所有的records并写入一个memtable
std::string scratch;
Slice record;
WriteBatch batch;
int compactions = 0;
MemTable* mem = nullptr;
//循环读取日志文件
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
//将records写入memtable
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
//如果memtable大于阈值,则将其转换成sstable(默认4MB)
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
if (!status.ok()) {
break;
}
}
}
delete file;
//判断是否应该继续重复使用最后一个日志文件
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == nullptr);
assert(log_ == nullptr);
assert(mem_ == nullptr);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != nullptr) {
mem_ = mem;
mem = nullptr;
} else {
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
}
}
}
if (mem != nullptr) {
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
mem->Unref();
}
return status;
}
|