db_impl.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "db/db_impl.h" 6 7 #include <stdint.h> 8 #include <stdio.h> 9 10 #include <algorithm> 11 #include <atomic> 12 #include <set> 13 #include <string> 14 #include <vector> 15 16 #include "db/builder.h" 17 #include "db/db_iter.h" 18 #include "db/dbformat.h" 19 #include "db/filename.h" 20 #include "db/log_reader.h" 21 #include "db/log_writer.h" 22 #include "db/memtable.h" 23 #include "db/table_cache.h" 24 #include "db/version_set.h" 25 #include "db/write_batch_internal.h" 26 #include "leveldb/db.h" 27 #include "leveldb/env.h" 28 #include "leveldb/status.h" 29 #include "leveldb/table.h" 30 #include "leveldb/table_builder.h" 31 #include "port/port.h" 32 #include "table/block.h" 33 #include "table/merger.h" 34 #include "table/two_level_iterator.h" 35 #include "util/coding.h" 36 #include "util/logging.h" 37 #include "util/mutexlock.h" 38 39 namespace leveldb { 40 41 const int kNumNonTableCacheFiles = 10; 42 43 // Information kept for every waiting writer 44 struct DBImpl::Writer { 45 explicit Writer(port::Mutex* mu) 46 : batch(nullptr), sync(false), done(false), cv(mu) {} 47 48 Status status; 49 WriteBatch* batch; 50 bool sync; 51 bool done; 52 port::CondVar cv; 53 }; 54 55 struct DBImpl::CompactionState { 56 // Files produced by compaction 57 struct Output { 58 uint64_t number; 59 uint64_t file_size; 60 InternalKey smallest, largest; 61 }; 62 63 Output* current_output() { return &outputs[outputs.size() - 1]; } 64 65 explicit CompactionState(Compaction* c) 66 : compaction(c), 67 smallest_snapshot(0), 68 outfile(nullptr), 69 builder(nullptr), 70 total_bytes(0) {} 71 72 Compaction* const compaction; 73 74 // Sequence numbers < smallest_snapshot are not significant since we 75 // will never have to service a snapshot below smallest_snapshot. 76 // Therefore if we have seen a sequence number S <= smallest_snapshot, 77 // we can drop all entries for the same key with sequence numbers < S. 78 SequenceNumber smallest_snapshot; 79 80 std::vector<Output> outputs; 81 82 // State kept for output being generated 83 WritableFile* outfile; 84 TableBuilder* builder; 85 86 uint64_t total_bytes; 87 }; 88 89 // Fix user-supplied options to be reasonable 90 template <class T, class V> 91 static void ClipToRange(T* ptr, V minvalue, V maxvalue) { 92 if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; 93 if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; 94 } 95 Options SanitizeOptions(const std::string& dbname, 96 const InternalKeyComparator* icmp, 97 const InternalFilterPolicy* ipolicy, 98 const Options& src) { 99 Options result = src; 100 result.comparator = icmp; 101 result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; 102 ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); 103 ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30); 104 ClipToRange(&result.max_file_size, 1 << 20, 1 << 30); 105 ClipToRange(&result.block_size, 1 << 10, 4 << 20); 106 if (result.info_log == nullptr) { 107 // Open a log file in the same directory as the db 108 src.env->CreateDir(dbname); // In case it does not exist 109 src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); 110 Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); 111 if (!s.ok()) { 112 // No place suitable for logging 113 result.info_log = nullptr; 114 } 115 } 116 if (result.block_cache == nullptr) { 117 result.block_cache = NewLRUCache(8 << 20); 118 } 119 return result; 120 } 121 122 static int TableCacheSize(const Options& sanitized_options) { 123 // Reserve ten files or so for other uses and give the rest to TableCache. 124 return sanitized_options.max_open_files - kNumNonTableCacheFiles; 125 } 126 127 DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) 128 : env_(raw_options.env), 129 internal_comparator_(raw_options.comparator), 130 internal_filter_policy_(raw_options.filter_policy), 131 options_(SanitizeOptions(dbname, &internal_comparator_, 132 &internal_filter_policy_, raw_options)), 133 owns_info_log_(options_.info_log != raw_options.info_log), 134 owns_cache_(options_.block_cache != raw_options.block_cache), 135 dbname_(dbname), 136 table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), 137 db_lock_(nullptr), 138 shutting_down_(false), 139 background_work_finished_signal_(&mutex_), 140 mem_(nullptr), 141 imm_(nullptr), 142 has_imm_(false), 143 logfile_(nullptr), 144 logfile_number_(0), 145 log_(nullptr), 146 seed_(0), 147 tmp_batch_(new WriteBatch), 148 background_compaction_scheduled_(false), 149 manual_compaction_(nullptr), 150 versions_(new VersionSet(dbname_, &options_, table_cache_, 151 &internal_comparator_)) {} 152 153 DBImpl::~DBImpl() { 154 // Wait for background work to finish. 155 mutex_.Lock(); 156 shutting_down_.store(true, std::memory_order_release); 157 while (background_compaction_scheduled_) { 158 background_work_finished_signal_.Wait(); 159 } 160 mutex_.Unlock(); 161 162 if (db_lock_ != nullptr) { 163 env_->UnlockFile(db_lock_); 164 } 165 166 delete versions_; 167 if (mem_ != nullptr) mem_->Unref(); 168 if (imm_ != nullptr) imm_->Unref(); 169 delete tmp_batch_; 170 delete log_; 171 delete logfile_; 172 delete table_cache_; 173 174 if (owns_info_log_) { 175 delete options_.info_log; 176 } 177 if (owns_cache_) { 178 delete options_.block_cache; 179 } 180 } 181 182 Status DBImpl::NewDB() { 183 VersionEdit new_db; 184 new_db.SetComparatorName(user_comparator()->Name()); 185 new_db.SetLogNumber(0); 186 new_db.SetNextFile(2); 187 new_db.SetLastSequence(0); 188 189 const std::string manifest = DescriptorFileName(dbname_, 1); 190 WritableFile* file; 191 Status s = env_->NewWritableFile(manifest, &file); 192 if (!s.ok()) { 193 return s; 194 } 195 { 196 log::Writer log(file); 197 std::string record; 198 new_db.EncodeTo(&record); 199 s = log.AddRecord(record); 200 if (s.ok()) { 201 s = file->Close(); 202 } 203 } 204 delete file; 205 if (s.ok()) { 206 // Make "CURRENT" file that points to the new manifest file. 207 s = SetCurrentFile(env_, dbname_, 1); 208 } else { 209 env_->DeleteFile(manifest); 210 } 211 return s; 212 } 213 214 void DBImpl::MaybeIgnoreError(Status* s) const { 215 if (s->ok() || options_.paranoid_checks) { 216 // No change needed 217 } else { 218 Log(options_.info_log, "Ignoring error %s", s->ToString().c_str()); 219 *s = Status::OK(); 220 } 221 } 222 223 void DBImpl::DeleteObsoleteFiles() { 224 mutex_.AssertHeld(); 225 226 if (!bg_error_.ok()) { 227 // After a background error, we don't know whether a new version may 228 // or may not have been committed, so we cannot safely garbage collect. 229 return; 230 } 231 232 // Make a set of all of the live files 233 std::set<uint64_t> live = pending_outputs_; 234 versions_->AddLiveFiles(&live); 235 236 std::vector<std::string> filenames; 237 env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose 238 uint64_t number; 239 FileType type; 240 std::vector<std::string> files_to_delete; 241 for (std::string& filename : filenames) { 242 if (ParseFileName(filename, &number, &type)) { 243 bool keep = true; 244 switch (type) { 245 case kLogFile: 246 keep = ((number >= versions_->LogNumber()) || 247 (number == versions_->PrevLogNumber())); 248 break; 249 case kDescriptorFile: 250 // Keep my manifest file, and any newer incarnations' 251 // (in case there is a race that allows other incarnations) 252 keep = (number >= versions_->ManifestFileNumber()); 253 break; 254 case kTableFile: 255 keep = (live.find(number) != live.end()); 256 break; 257 case kTempFile: 258 // Any temp files that are currently being written to must 259 // be recorded in pending_outputs_, which is inserted into "live" 260 keep = (live.find(number) != live.end()); 261 break; 262 case kCurrentFile: 263 case kDBLockFile: 264 case kInfoLogFile: 265 keep = true; 266 break; 267 } 268 269 if (!keep) { 270 files_to_delete.push_back(std::move(filename)); 271 if (type == kTableFile) { 272 table_cache_->Evict(number); 273 } 274 Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type), 275 static_cast<unsigned long long>(number)); 276 } 277 } 278 } 279 280 // While deleting all files unblock other threads. All files being deleted 281 // have unique names which will not collide with newly created files and 282 // are therefore safe to delete while allowing other threads to proceed. 283 mutex_.Unlock(); 284 for (const std::string& filename : files_to_delete) { 285 env_->DeleteFile(dbname_ + "/" + filename); 286 } 287 mutex_.Lock(); 288 } 289 290 Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { 291 mutex_.AssertHeld(); 292 293 // Ignore error from CreateDir since the creation of the DB is 294 // committed only when the descriptor is created, and this directory 295 // may already exist from a previous failed creation attempt. 296 env_->CreateDir(dbname_); 297 assert(db_lock_ == nullptr); 298 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); 299 if (!s.ok()) { 300 return s; 301 } 302 303 if (!env_->FileExists(CurrentFileName(dbname_))) { 304 if (options_.create_if_missing) { 305 s = NewDB(); 306 if (!s.ok()) { 307 return s; 308 } 309 } else { 310 return Status::InvalidArgument( 311 dbname_, "does not exist (create_if_missing is false)"); 312 } 313 } else { 314 if (options_.error_if_exists) { 315 return Status::InvalidArgument(dbname_, 316 "exists (error_if_exists is true)"); 317 } 318 } 319 320 s = versions_->Recover(save_manifest); 321 if (!s.ok()) { 322 return s; 323 } 324 SequenceNumber max_sequence(0); 325 326 // Recover from all newer log files than the ones named in the 327 // descriptor (new log files may have been added by the previous 328 // incarnation without registering them in the descriptor). 329 // 330 // Note that PrevLogNumber() is no longer used, but we pay 331 // attention to it in case we are recovering a database 332 // produced by an older version of leveldb. 333 const uint64_t min_log = versions_->LogNumber(); 334 const uint64_t prev_log = versions_->PrevLogNumber(); 335 std::vector<std::string> filenames; 336 s = env_->GetChildren(dbname_, &filenames); 337 if (!s.ok()) { 338 return s; 339 } 340 std::set<uint64_t> expected; 341 versions_->AddLiveFiles(&expected); 342 uint64_t number; 343 FileType type; 344 std::vector<uint64_t> logs; 345 for (size_t i = 0; i < filenames.size(); i++) { 346 if (ParseFileName(filenames[i], &number, &type)) { 347 expected.erase(number); 348 if (type == kLogFile && ((number >= min_log) || (number == prev_log))) 349 logs.push_back(number); 350 } 351 } 352 if (!expected.empty()) { 353 char buf[50]; 354 snprintf(buf, sizeof(buf), "%d missing files; e.g.", 355 static_cast<int>(expected.size())); 356 return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); 357 } 358 359 // Recover in the order in which the logs were generated 360 std::sort(logs.begin(), logs.end()); 361 for (size_t i = 0; i < logs.size(); i++) { 362 s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, 363 &max_sequence); 364 if (!s.ok()) { 365 return s; 366 } 367 368 // The previous incarnation may not have written any MANIFEST 369 // records after allocating this log number. So we manually 370 // update the file number allocation counter in VersionSet. 371 versions_->MarkFileNumberUsed(logs[i]); 372 } 373 374 if (versions_->LastSequence() < max_sequence) { 375 versions_->SetLastSequence(max_sequence); 376 } 377 378 return Status::OK(); 379 } 380 381 Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, 382 bool* save_manifest, VersionEdit* edit, 383 SequenceNumber* max_sequence) { 384 struct LogReporter : public log::Reader::Reporter { 385 Env* env; 386 Logger* info_log; 387 const char* fname; 388 Status* status; // null if options_.paranoid_checks==false 389 void Corruption(size_t bytes, const Status& s) override { 390 Log(info_log, "%s%s: dropping %d bytes; %s", 391 (this->status == nullptr ? "(ignoring error) " : ""), fname, 392 static_cast<int>(bytes), s.ToString().c_str()); 393 if (this->status != nullptr && this->status->ok()) *this->status = s; 394 } 395 }; 396 397 mutex_.AssertHeld(); 398 399 // Open the log file 400 std::string fname = LogFileName(dbname_, log_number); 401 SequentialFile* file; 402 Status status = env_->NewSequentialFile(fname, &file); 403 if (!status.ok()) { 404 MaybeIgnoreError(&status); 405 return status; 406 } 407 408 // Create the log reader. 409 LogReporter reporter; 410 reporter.env = env_; 411 reporter.info_log = options_.info_log; 412 reporter.fname = fname.c_str(); 413 reporter.status = (options_.paranoid_checks ? &status : nullptr); 414 // We intentionally make log::Reader do checksumming even if 415 // paranoid_checks==false so that corruptions cause entire commits 416 // to be skipped instead of propagating bad information (like overly 417 // large sequence numbers). 418 log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/); 419 Log(options_.info_log, "Recovering log #%llu", 420 (unsigned long long)log_number); 421 422 // Read all the records and add to a memtable 423 std::string scratch; 424 Slice record; 425 WriteBatch batch; 426 int compactions = 0; 427 MemTable* mem = nullptr; 428 while (reader.ReadRecord(&record, &scratch) && status.ok()) { 429 if (record.size() < 12) { 430 reporter.Corruption(record.size(), 431 Status::Corruption("log record too small", fname)); 432 continue; 433 } 434 WriteBatchInternal::SetContents(&batch, record); 435 436 if (mem == nullptr) { 437 mem = new MemTable(internal_comparator_); 438 mem->Ref(); 439 } 440 status = WriteBatchInternal::InsertInto(&batch, mem); 441 MaybeIgnoreError(&status); 442 if (!status.ok()) { 443 break; 444 } 445 const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + 446 WriteBatchInternal::Count(&batch) - 1; 447 if (last_seq > *max_sequence) { 448 *max_sequence = last_seq; 449 } 450 451 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { 452 compactions++; 453 *save_manifest = true; 454 status = WriteLevel0Table(mem, edit, nullptr); 455 mem->Unref(); 456 mem = nullptr; 457 if (!status.ok()) { 458 // Reflect errors immediately so that conditions like full 459 // file-systems cause the DB::Open() to fail. 460 break; 461 } 462 } 463 } 464 465 delete file; 466 467 // See if we should keep reusing the last log file. 468 if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { 469 assert(logfile_ == nullptr); 470 assert(log_ == nullptr); 471 assert(mem_ == nullptr); 472 uint64_t lfile_size; 473 if (env_->GetFileSize(fname, &lfile_size).ok() && 474 env_->NewAppendableFile(fname, &logfile_).ok()) { 475 Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); 476 log_ = new log::Writer(logfile_, lfile_size); 477 logfile_number_ = log_number; 478 if (mem != nullptr) { 479 mem_ = mem; 480 mem = nullptr; 481 } else { 482 // mem can be nullptr if lognum exists but was empty. 483 mem_ = new MemTable(internal_comparator_); 484 mem_->Ref(); 485 } 486 } 487 } 488 489 if (mem != nullptr) { 490 // mem did not get reused; compact it. 491 if (status.ok()) { 492 *save_manifest = true; 493 status = WriteLevel0Table(mem, edit, nullptr); 494 } 495 mem->Unref(); 496 } 497 498 return status; 499 } 500 501 Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, 502 Version* base) { 503 mutex_.AssertHeld(); 504 const uint64_t start_micros = env_->NowMicros(); 505 FileMetaData meta; 506 meta.number = versions_->NewFileNumber(); 507 pending_outputs_.insert(meta.number); 508 Iterator* iter = mem->NewIterator(); 509 Log(options_.info_log, "Level-0 table #%llu: started", 510 (unsigned long long)meta.number); 511 512 Status s; 513 { 514 mutex_.Unlock(); 515 s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); 516 mutex_.Lock(); 517 } 518 519 Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", 520 (unsigned long long)meta.number, (unsigned long long)meta.file_size, 521 s.ToString().c_str()); 522 delete iter; 523 pending_outputs_.erase(meta.number); 524 525 // Note that if file_size is zero, the file has been deleted and 526 // should not be added to the manifest. 527 int level = 0; 528 if (s.ok() && meta.file_size > 0) { 529 const Slice min_user_key = meta.smallest.user_key(); 530 const Slice max_user_key = meta.largest.user_key(); 531 if (base != nullptr) { 532 level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); 533 } 534 edit->AddFile(level, meta.number, meta.file_size, meta.smallest, 535 meta.largest); 536 } 537 538 CompactionStats stats; 539 stats.micros = env_->NowMicros() - start_micros; 540 stats.bytes_written = meta.file_size; 541 stats_[level].Add(stats); 542 return s; 543 } 544 545 void DBImpl::CompactMemTable() { 546 mutex_.AssertHeld(); 547 assert(imm_ != nullptr); 548 549 // Save the contents of the memtable as a new Table 550 VersionEdit edit; 551 Version* base = versions_->current(); 552 base->Ref(); 553 Status s = WriteLevel0Table(imm_, &edit, base); 554 base->Unref(); 555 556 if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { 557 s = Status::IOError("Deleting DB during memtable compaction"); 558 } 559 560 // Replace immutable memtable with the generated Table 561 if (s.ok()) { 562 edit.SetPrevLogNumber(0); 563 edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed 564 s = versions_->LogAndApply(&edit, &mutex_); 565 } 566 567 if (s.ok()) { 568 // Commit to the new state 569 imm_->Unref(); 570 imm_ = nullptr; 571 has_imm_.store(false, std::memory_order_release); 572 DeleteObsoleteFiles(); 573 } else { 574 RecordBackgroundError(s); 575 } 576 } 577 578 void DBImpl::CompactRange(const Slice* begin, const Slice* end) { 579 int max_level_with_files = 1; 580 { 581 MutexLock l(&mutex_); 582 Version* base = versions_->current(); 583 for (int level = 1; level < config::kNumLevels; level++) { 584 if (base->OverlapInLevel(level, begin, end)) { 585 max_level_with_files = level; 586 } 587 } 588 } 589 TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap 590 for (int level = 0; level < max_level_with_files; level++) { 591 TEST_CompactRange(level, begin, end); 592 } 593 } 594 595 void DBImpl::TEST_CompactRange(int level, const Slice* begin, 596 const Slice* end) { 597 assert(level >= 0); 598 assert(level + 1 < config::kNumLevels); 599 600 InternalKey begin_storage, end_storage; 601 602 ManualCompaction manual; 603 manual.level = level; 604 manual.done = false; 605 if (begin == nullptr) { 606 manual.begin = nullptr; 607 } else { 608 begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); 609 manual.begin = &begin_storage; 610 } 611 if (end == nullptr) { 612 manual.end = nullptr; 613 } else { 614 end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); 615 manual.end = &end_storage; 616 } 617 618 MutexLock l(&mutex_); 619 while (!manual.done && !shutting_down_.load(std::memory_order_acquire) && 620 bg_error_.ok()) { 621 if (manual_compaction_ == nullptr) { // Idle 622 manual_compaction_ = &manual; 623 MaybeScheduleCompaction(); 624 } else { // Running either my compaction or another compaction. 625 background_work_finished_signal_.Wait(); 626 } 627 } 628 if (manual_compaction_ == &manual) { 629 // Cancel my manual compaction since we aborted early for some reason. 630 manual_compaction_ = nullptr; 631 } 632 } 633 634 Status DBImpl::TEST_CompactMemTable() { 635 // nullptr batch means just wait for earlier writes to be done 636 Status s = Write(WriteOptions(), nullptr); 637 if (s.ok()) { 638 // Wait until the compaction completes 639 MutexLock l(&mutex_); 640 while (imm_ != nullptr && bg_error_.ok()) { 641 background_work_finished_signal_.Wait(); 642 } 643 if (imm_ != nullptr) { 644 s = bg_error_; 645 } 646 } 647 return s; 648 } 649 650 void DBImpl::RecordBackgroundError(const Status& s) { 651 mutex_.AssertHeld(); 652 if (bg_error_.ok()) { 653 bg_error_ = s; 654 background_work_finished_signal_.SignalAll(); 655 } 656 } 657 658 void DBImpl::MaybeScheduleCompaction() { 659 mutex_.AssertHeld(); 660 if (background_compaction_scheduled_) { 661 // Already scheduled 662 } else if (shutting_down_.load(std::memory_order_acquire)) { 663 // DB is being deleted; no more background compactions 664 } else if (!bg_error_.ok()) { 665 // Already got an error; no more changes 666 } else if (imm_ == nullptr && manual_compaction_ == nullptr && 667 !versions_->NeedsCompaction()) { 668 // No work to be done 669 } else { 670 background_compaction_scheduled_ = true; 671 env_->Schedule(&DBImpl::BGWork, this); 672 } 673 } 674 675 void DBImpl::BGWork(void* db) { 676 reinterpret_cast<DBImpl*>(db)->BackgroundCall(); 677 } 678 679 void DBImpl::BackgroundCall() { 680 MutexLock l(&mutex_); 681 assert(background_compaction_scheduled_); 682 if (shutting_down_.load(std::memory_order_acquire)) { 683 // No more background work when shutting down. 684 } else if (!bg_error_.ok()) { 685 // No more background work after a background error. 686 } else { 687 BackgroundCompaction(); 688 } 689 690 background_compaction_scheduled_ = false; 691 692 // Previous compaction may have produced too many files in a level, 693 // so reschedule another compaction if needed. 694 MaybeScheduleCompaction(); 695 background_work_finished_signal_.SignalAll(); 696 } 697 698 void DBImpl::BackgroundCompaction() { 699 mutex_.AssertHeld(); 700 701 if (imm_ != nullptr) { 702 CompactMemTable(); 703 return; 704 } 705 706 Compaction* c; 707 bool is_manual = (manual_compaction_ != nullptr); 708 InternalKey manual_end; 709 if (is_manual) { 710 ManualCompaction* m = manual_compaction_; 711 c = versions_->CompactRange(m->level, m->begin, m->end); 712 m->done = (c == nullptr); 713 if (c != nullptr) { 714 manual_end = c->input(0, c->num_input_files(0) - 1)->largest; 715 } 716 Log(options_.info_log, 717 "Manual compaction at level-%d from %s .. %s; will stop at %s\n", 718 m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), 719 (m->end ? m->end->DebugString().c_str() : "(end)"), 720 (m->done ? "(end)" : manual_end.DebugString().c_str())); 721 } else { 722 c = versions_->PickCompaction(); 723 } 724 725 Status status; 726 if (c == nullptr) { 727 // Nothing to do 728 } else if (!is_manual && c->IsTrivialMove()) { 729 // Move file to next level 730 assert(c->num_input_files(0) == 1); 731 FileMetaData* f = c->input(0, 0); 732 c->edit()->DeleteFile(c->level(), f->number); 733 c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, 734 f->largest); 735 status = versions_->LogAndApply(c->edit(), &mutex_); 736 if (!status.ok()) { 737 RecordBackgroundError(status); 738 } 739 VersionSet::LevelSummaryStorage tmp; 740 Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", 741 static_cast<unsigned long long>(f->number), c->level() + 1, 742 static_cast<unsigned long long>(f->file_size), 743 status.ToString().c_str(), versions_->LevelSummary(&tmp)); 744 } else { 745 CompactionState* compact = new CompactionState(c); 746 status = DoCompactionWork(compact); 747 if (!status.ok()) { 748 RecordBackgroundError(status); 749 } 750 CleanupCompaction(compact); 751 c->ReleaseInputs(); 752 DeleteObsoleteFiles(); 753 } 754 delete c; 755 756 if (status.ok()) { 757 // Done 758 } else if (shutting_down_.load(std::memory_order_acquire)) { 759 // Ignore compaction errors found during shutting down 760 } else { 761 Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); 762 } 763 764 if (is_manual) { 765 ManualCompaction* m = manual_compaction_; 766 if (!status.ok()) { 767 m->done = true; 768 } 769 if (!m->done) { 770 // We only compacted part of the requested range. Update *m 771 // to the range that is left to be compacted. 772 m->tmp_storage = manual_end; 773 m->begin = &m->tmp_storage; 774 } 775 manual_compaction_ = nullptr; 776 } 777 } 778 779 void DBImpl::CleanupCompaction(CompactionState* compact) { 780 mutex_.AssertHeld(); 781 if (compact->builder != nullptr) { 782 // May happen if we get a shutdown call in the middle of compaction 783 compact->builder->Abandon(); 784 delete compact->builder; 785 } else { 786 assert(compact->outfile == nullptr); 787 } 788 delete compact->outfile; 789 for (size_t i = 0; i < compact->outputs.size(); i++) { 790 const CompactionState::Output& out = compact->outputs[i]; 791 pending_outputs_.erase(out.number); 792 } 793 delete compact; 794 } 795 796 Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { 797 assert(compact != nullptr); 798 assert(compact->builder == nullptr); 799 uint64_t file_number; 800 { 801 mutex_.Lock(); 802 file_number = versions_->NewFileNumber(); 803 pending_outputs_.insert(file_number); 804 CompactionState::Output out; 805 out.number = file_number; 806 out.file_size = 0; 807 out.smallest.Clear(); 808 out.largest.Clear(); 809 compact->outputs.push_back(out); 810 mutex_.Unlock(); 811 } 812 813 // Make the output file 814 std::string fname = TableFileName(dbname_, file_number); 815 Status s = env_->NewWritableFile(fname, &compact->outfile); 816 if (s.ok()) { 817 compact->builder = new TableBuilder(options_, compact->outfile); 818 } 819 return s; 820 } 821 822 Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, 823 Iterator* input) { 824 assert(compact != nullptr); 825 assert(compact->outfile != nullptr); 826 assert(compact->builder != nullptr); 827 828 const uint64_t output_number = compact->current_output()->number; 829 assert(output_number != 0); 830 831 // Check for iterator errors 832 Status s = input->status(); 833 const uint64_t current_entries = compact->builder->NumEntries(); 834 if (s.ok()) { 835 s = compact->builder->Finish(); 836 } else { 837 compact->builder->Abandon(); 838 } 839 const uint64_t current_bytes = compact->builder->FileSize(); 840 compact->current_output()->file_size = current_bytes; 841 compact->total_bytes += current_bytes; 842 delete compact->builder; 843 compact->builder = nullptr; 844 845 // Finish and check for file errors 846 if (s.ok()) { 847 s = compact->outfile->Sync(); 848 } 849 if (s.ok()) { 850 s = compact->outfile->Close(); 851 } 852 delete compact->outfile; 853 compact->outfile = nullptr; 854 855 if (s.ok() && current_entries > 0) { 856 // Verify that the table is usable 857 Iterator* iter = 858 table_cache_->NewIterator(ReadOptions(), output_number, current_bytes); 859 s = iter->status(); 860 delete iter; 861 if (s.ok()) { 862 Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes", 863 (unsigned long long)output_number, compact->compaction->level(), 864 (unsigned long long)current_entries, 865 (unsigned long long)current_bytes); 866 } 867 } 868 return s; 869 } 870 871 Status DBImpl::InstallCompactionResults(CompactionState* compact) { 872 mutex_.AssertHeld(); 873 Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", 874 compact->compaction->num_input_files(0), compact->compaction->level(), 875 compact->compaction->num_input_files(1), compact->compaction->level() + 1, 876 static_cast<long long>(compact->total_bytes)); 877 878 // Add compaction outputs 879 compact->compaction->AddInputDeletions(compact->compaction->edit()); 880 const int level = compact->compaction->level(); 881 for (size_t i = 0; i < compact->outputs.size(); i++) { 882 const CompactionState::Output& out = compact->outputs[i]; 883 compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, 884 out.smallest, out.largest); 885 } 886 return versions_->LogAndApply(compact->compaction->edit(), &mutex_); 887 } 888 889 Status DBImpl::DoCompactionWork(CompactionState* compact) { 890 const uint64_t start_micros = env_->NowMicros(); 891 int64_t imm_micros = 0; // Micros spent doing imm_ compactions 892 893 Log(options_.info_log, "Compacting %d@%d + %d@%d files", 894 compact->compaction->num_input_files(0), compact->compaction->level(), 895 compact->compaction->num_input_files(1), 896 compact->compaction->level() + 1); 897 898 assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); 899 assert(compact->builder == nullptr); 900 assert(compact->outfile == nullptr); 901 if (snapshots_.empty()) { 902 compact->smallest_snapshot = versions_->LastSequence(); 903 } else { 904 compact->smallest_snapshot = snapshots_.oldest()->sequence_number(); 905 } 906 907 Iterator* input = versions_->MakeInputIterator(compact->compaction); 908 909 // Release mutex while we're actually doing the compaction work 910 mutex_.Unlock(); 911 912 input->SeekToFirst(); 913 Status status; 914 ParsedInternalKey ikey; 915 std::string current_user_key; 916 bool has_current_user_key = false; 917 SequenceNumber last_sequence_for_key = kMaxSequenceNumber; 918 while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { 919 // Prioritize immutable compaction work 920 if (has_imm_.load(std::memory_order_relaxed)) { 921 const uint64_t imm_start = env_->NowMicros(); 922 mutex_.Lock(); 923 if (imm_ != nullptr) { 924 CompactMemTable(); 925 // Wake up MakeRoomForWrite() if necessary. 926 background_work_finished_signal_.SignalAll(); 927 } 928 mutex_.Unlock(); 929 imm_micros += (env_->NowMicros() - imm_start); 930 } 931 932 Slice key = input->key(); 933 if (compact->compaction->ShouldStopBefore(key) && 934 compact->builder != nullptr) { 935 status = FinishCompactionOutputFile(compact, input); 936 if (!status.ok()) { 937 break; 938 } 939 } 940 941 // Handle key/value, add to state, etc. 942 bool drop = false; 943 if (!ParseInternalKey(key, &ikey)) { 944 // Do not hide error keys 945 current_user_key.clear(); 946 has_current_user_key = false; 947 last_sequence_for_key = kMaxSequenceNumber; 948 } else { 949 if (!has_current_user_key || 950 user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 951 0) { 952 // First occurrence of this user key 953 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); 954 has_current_user_key = true; 955 last_sequence_for_key = kMaxSequenceNumber; 956 } 957 958 if (last_sequence_for_key <= compact->smallest_snapshot) { 959 // Hidden by an newer entry for same user key 960 drop = true; // (A) 961 } else if (ikey.type == kTypeDeletion && 962 ikey.sequence <= compact->smallest_snapshot && 963 compact->compaction->IsBaseLevelForKey(ikey.user_key)) { 964 // For this user key: 965 // (1) there is no data in higher levels 966 // (2) data in lower levels will have larger sequence numbers 967 // (3) data in layers that are being compacted here and have 968 // smaller sequence numbers will be dropped in the next 969 // few iterations of this loop (by rule (A) above). 970 // Therefore this deletion marker is obsolete and can be dropped. 971 drop = true; 972 } 973 974 last_sequence_for_key = ikey.sequence; 975 } 976 #if 0 977 Log(options_.info_log, 978 " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " 979 "%d smallest_snapshot: %d", 980 ikey.user_key.ToString().c_str(), 981 (int)ikey.sequence, ikey.type, kTypeValue, drop, 982 compact->compaction->IsBaseLevelForKey(ikey.user_key), 983 (int)last_sequence_for_key, (int)compact->smallest_snapshot); 984 #endif 985 986 if (!drop) { 987 // Open output file if necessary 988 if (compact->builder == nullptr) { 989 status = OpenCompactionOutputFile(compact); 990 if (!status.ok()) { 991 break; 992 } 993 } 994 if (compact->builder->NumEntries() == 0) { 995 compact->current_output()->smallest.DecodeFrom(key); 996 } 997 compact->current_output()->largest.DecodeFrom(key); 998 compact->builder->Add(key, input->value()); 999 1000 // Close output file if it is big enough 1001 if (compact->builder->FileSize() >= 1002 compact->compaction->MaxOutputFileSize()) { 1003 status = FinishCompactionOutputFile(compact, input); 1004 if (!status.ok()) { 1005 break; 1006 } 1007 } 1008 } 1009 1010 input->Next(); 1011 } 1012 1013 if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { 1014 status = Status::IOError("Deleting DB during compaction"); 1015 } 1016 if (status.ok() && compact->builder != nullptr) { 1017 status = FinishCompactionOutputFile(compact, input); 1018 } 1019 if (status.ok()) { 1020 status = input->status(); 1021 } 1022 delete input; 1023 input = nullptr; 1024 1025 CompactionStats stats; 1026 stats.micros = env_->NowMicros() - start_micros - imm_micros; 1027 for (int which = 0; which < 2; which++) { 1028 for (int i = 0; i < compact->compaction->num_input_files(which); i++) { 1029 stats.bytes_read += compact->compaction->input(which, i)->file_size; 1030 } 1031 } 1032 for (size_t i = 0; i < compact->outputs.size(); i++) { 1033 stats.bytes_written += compact->outputs[i].file_size; 1034 } 1035 1036 mutex_.Lock(); 1037 stats_[compact->compaction->level() + 1].Add(stats); 1038 1039 if (status.ok()) { 1040 status = InstallCompactionResults(compact); 1041 } 1042 if (!status.ok()) { 1043 RecordBackgroundError(status); 1044 } 1045 VersionSet::LevelSummaryStorage tmp; 1046 Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); 1047 return status; 1048 } 1049 1050 namespace { 1051 1052 struct IterState { 1053 port::Mutex* const mu; 1054 Version* const version GUARDED_BY(mu); 1055 MemTable* const mem GUARDED_BY(mu); 1056 MemTable* const imm GUARDED_BY(mu); 1057 1058 IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version) 1059 : mu(mutex), version(version), mem(mem), imm(imm) {} 1060 }; 1061 1062 static void CleanupIteratorState(void* arg1, void* arg2) { 1063 IterState* state = reinterpret_cast<IterState*>(arg1); 1064 state->mu->Lock(); 1065 state->mem->Unref(); 1066 if (state->imm != nullptr) state->imm->Unref(); 1067 state->version->Unref(); 1068 state->mu->Unlock(); 1069 delete state; 1070 } 1071 1072 } // anonymous namespace 1073 1074 Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, 1075 SequenceNumber* latest_snapshot, 1076 uint32_t* seed) { 1077 mutex_.Lock(); 1078 *latest_snapshot = versions_->LastSequence(); 1079 1080 // Collect together all needed child iterators 1081 std::vector<Iterator*> list; 1082 list.push_back(mem_->NewIterator()); 1083 mem_->Ref(); 1084 if (imm_ != nullptr) { 1085 list.push_back(imm_->NewIterator()); 1086 imm_->Ref(); 1087 } 1088 versions_->current()->AddIterators(options, &list); 1089 Iterator* internal_iter = 1090 NewMergingIterator(&internal_comparator_, &list[0], list.size()); 1091 versions_->current()->Ref(); 1092 1093 IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); 1094 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); 1095 1096 *seed = ++seed_; 1097 mutex_.Unlock(); 1098 return internal_iter; 1099 } 1100 1101 Iterator* DBImpl::TEST_NewInternalIterator() { 1102 SequenceNumber ignored; 1103 uint32_t ignored_seed; 1104 return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); 1105 } 1106 1107 int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { 1108 MutexLock l(&mutex_); 1109 return versions_->MaxNextLevelOverlappingBytes(); 1110 } 1111 1112 Status DBImpl::Get(const ReadOptions& options, const Slice& key, 1113 std::string* value) { 1114 Status s; 1115 MutexLock l(&mutex_); 1116 SequenceNumber snapshot; 1117 if (options.snapshot != nullptr) { 1118 snapshot = 1119 static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number(); 1120 } else { 1121 snapshot = versions_->LastSequence(); 1122 } 1123 1124 MemTable* mem = mem_; 1125 MemTable* imm = imm_; 1126 Version* current = versions_->current(); 1127 mem->Ref(); 1128 if (imm != nullptr) imm->Ref(); 1129 current->Ref(); 1130 1131 bool have_stat_update = false; 1132 Version::GetStats stats; 1133 1134 // Unlock while reading from files and memtables 1135 { 1136 mutex_.Unlock(); 1137 // First look in the memtable, then in the immutable memtable (if any). 1138 LookupKey lkey(key, snapshot); 1139 if (mem->Get(lkey, value, &s)) { 1140 // Done 1141 } else if (imm != nullptr && imm->Get(lkey, value, &s)) { 1142 // Done 1143 } else { 1144 s = current->Get(options, lkey, value, &stats); 1145 have_stat_update = true; 1146 } 1147 mutex_.Lock(); 1148 } 1149 1150 if (have_stat_update && current->UpdateStats(stats)) { 1151 MaybeScheduleCompaction(); 1152 } 1153 mem->Unref(); 1154 if (imm != nullptr) imm->Unref(); 1155 current->Unref(); 1156 return s; 1157 } 1158 1159 Iterator* DBImpl::NewIterator(const ReadOptions& options) { 1160 SequenceNumber latest_snapshot; 1161 uint32_t seed; 1162 Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); 1163 return NewDBIterator(this, user_comparator(), iter, 1164 (options.snapshot != nullptr 1165 ? static_cast<const SnapshotImpl*>(options.snapshot) 1166 ->sequence_number() 1167 : latest_snapshot), 1168 seed); 1169 } 1170 1171 void DBImpl::RecordReadSample(Slice key) { 1172 MutexLock l(&mutex_); 1173 if (versions_->current()->RecordReadSample(key)) { 1174 MaybeScheduleCompaction(); 1175 } 1176 } 1177 1178 const Snapshot* DBImpl::GetSnapshot() { 1179 MutexLock l(&mutex_); 1180 return snapshots_.New(versions_->LastSequence()); 1181 } 1182 1183 void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { 1184 MutexLock l(&mutex_); 1185 snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot)); 1186 } 1187 1188 // Convenience methods 1189 Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { 1190 return DB::Put(o, key, val); 1191 } 1192 1193 Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { 1194 return DB::Delete(options, key); 1195 } 1196 1197 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { 1198 Writer w(&mutex_); 1199 w.batch = updates; 1200 w.sync = options.sync; 1201 w.done = false; 1202 1203 MutexLock l(&mutex_); 1204 writers_.push_back(&w); 1205 while (!w.done && &w != writers_.front()) { 1206 w.cv.Wait(); 1207 } 1208 if (w.done) { 1209 return w.status; 1210 } 1211 1212 // May temporarily unlock and wait. 1213 Status status = MakeRoomForWrite(updates == nullptr); 1214 uint64_t last_sequence = versions_->LastSequence(); 1215 Writer* last_writer = &w; 1216 if (status.ok() && updates != nullptr) { // nullptr batch is for compactions 1217 WriteBatch* write_batch = BuildBatchGroup(&last_writer); 1218 WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); 1219 last_sequence += WriteBatchInternal::Count(write_batch); 1220 1221 // Add to log and apply to memtable. We can release the lock 1222 // during this phase since &w is currently responsible for logging 1223 // and protects against concurrent loggers and concurrent writes 1224 // into mem_. 1225 { 1226 mutex_.Unlock(); 1227 status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); 1228 bool sync_error = false; 1229 if (status.ok() && options.sync) { 1230 status = logfile_->Sync(); 1231 if (!status.ok()) { 1232 sync_error = true; 1233 } 1234 } 1235 if (status.ok()) { 1236 status = WriteBatchInternal::InsertInto(write_batch, mem_); 1237 } 1238 mutex_.Lock(); 1239 if (sync_error) { 1240 // The state of the log file is indeterminate: the log record we 1241 // just added may or may not show up when the DB is re-opened. 1242 // So we force the DB into a mode where all future writes fail. 1243 RecordBackgroundError(status); 1244 } 1245 } 1246 if (write_batch == tmp_batch_) tmp_batch_->Clear(); 1247 1248 versions_->SetLastSequence(last_sequence); 1249 } 1250 1251 while (true) { 1252 Writer* ready = writers_.front(); 1253 writers_.pop_front(); 1254 if (ready != &w) { 1255 ready->status = status; 1256 ready->done = true; 1257 ready->cv.Signal(); 1258 } 1259 if (ready == last_writer) break; 1260 } 1261 1262 // Notify new head of write queue 1263 if (!writers_.empty()) { 1264 writers_.front()->cv.Signal(); 1265 } 1266 1267 return status; 1268 } 1269 1270 // REQUIRES: Writer list must be non-empty 1271 // REQUIRES: First writer must have a non-null batch 1272 WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { 1273 mutex_.AssertHeld(); 1274 assert(!writers_.empty()); 1275 Writer* first = writers_.front(); 1276 WriteBatch* result = first->batch; 1277 assert(result != nullptr); 1278 1279 size_t size = WriteBatchInternal::ByteSize(first->batch); 1280 1281 // Allow the group to grow up to a maximum size, but if the 1282 // original write is small, limit the growth so we do not slow 1283 // down the small write too much. 1284 size_t max_size = 1 << 20; 1285 if (size <= (128 << 10)) { 1286 max_size = size + (128 << 10); 1287 } 1288 1289 *last_writer = first; 1290 std::deque<Writer*>::iterator iter = writers_.begin(); 1291 ++iter; // Advance past "first" 1292 for (; iter != writers_.end(); ++iter) { 1293 Writer* w = *iter; 1294 if (w->sync && !first->sync) { 1295 // Do not include a sync write into a batch handled by a non-sync write. 1296 break; 1297 } 1298 1299 if (w->batch != nullptr) { 1300 size += WriteBatchInternal::ByteSize(w->batch); 1301 if (size > max_size) { 1302 // Do not make batch too big 1303 break; 1304 } 1305 1306 // Append to *result 1307 if (result == first->batch) { 1308 // Switch to temporary batch instead of disturbing caller's batch 1309 result = tmp_batch_; 1310 assert(WriteBatchInternal::Count(result) == 0); 1311 WriteBatchInternal::Append(result, first->batch); 1312 } 1313 WriteBatchInternal::Append(result, w->batch); 1314 } 1315 *last_writer = w; 1316 } 1317 return result; 1318 } 1319 1320 // REQUIRES: mutex_ is held 1321 // REQUIRES: this thread is currently at the front of the writer queue 1322 Status DBImpl::MakeRoomForWrite(bool force) { 1323 mutex_.AssertHeld(); 1324 assert(!writers_.empty()); 1325 bool allow_delay = !force; 1326 Status s; 1327 while (true) { 1328 if (!bg_error_.ok()) { 1329 // Yield previous error 1330 s = bg_error_; 1331 break; 1332 } else if (allow_delay && versions_->NumLevelFiles(0) >= 1333 config::kL0_SlowdownWritesTrigger) { 1334 // We are getting close to hitting a hard limit on the number of 1335 // L0 files. Rather than delaying a single write by several 1336 // seconds when we hit the hard limit, start delaying each 1337 // individual write by 1ms to reduce latency variance. Also, 1338 // this delay hands over some CPU to the compaction thread in 1339 // case it is sharing the same core as the writer. 1340 mutex_.Unlock(); 1341 env_->SleepForMicroseconds(1000); 1342 allow_delay = false; // Do not delay a single write more than once 1343 mutex_.Lock(); 1344 } else if (!force && 1345 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { 1346 // There is room in current memtable 1347 break; 1348 } else if (imm_ != nullptr) { 1349 // We have filled up the current memtable, but the previous 1350 // one is still being compacted, so we wait. 1351 Log(options_.info_log, "Current memtable full; waiting...\n"); 1352 background_work_finished_signal_.Wait(); 1353 } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { 1354 // There are too many level-0 files. 1355 Log(options_.info_log, "Too many L0 files; waiting...\n"); 1356 background_work_finished_signal_.Wait(); 1357 } else { 1358 // Attempt to switch to a new memtable and trigger compaction of old 1359 assert(versions_->PrevLogNumber() == 0); 1360 uint64_t new_log_number = versions_->NewFileNumber(); 1361 WritableFile* lfile = nullptr; 1362 s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); 1363 if (!s.ok()) { 1364 // Avoid chewing through file number space in a tight loop. 1365 versions_->ReuseFileNumber(new_log_number); 1366 break; 1367 } 1368 delete log_; 1369 delete logfile_; 1370 logfile_ = lfile; 1371 logfile_number_ = new_log_number; 1372 log_ = new log::Writer(lfile); 1373 imm_ = mem_; 1374 has_imm_.store(true, std::memory_order_release); 1375 mem_ = new MemTable(internal_comparator_); 1376 mem_->Ref(); 1377 force = false; // Do not force another compaction if have room 1378 MaybeScheduleCompaction(); 1379 } 1380 } 1381 return s; 1382 } 1383 1384 bool DBImpl::GetProperty(const Slice& property, std::string* value) { 1385 value->clear(); 1386 1387 MutexLock l(&mutex_); 1388 Slice in = property; 1389 Slice prefix("leveldb."); 1390 if (!in.starts_with(prefix)) return false; 1391 in.remove_prefix(prefix.size()); 1392 1393 if (in.starts_with("num-files-at-level")) { 1394 in.remove_prefix(strlen("num-files-at-level")); 1395 uint64_t level; 1396 bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); 1397 if (!ok || level >= config::kNumLevels) { 1398 return false; 1399 } else { 1400 char buf[100]; 1401 snprintf(buf, sizeof(buf), "%d", 1402 versions_->NumLevelFiles(static_cast<int>(level))); 1403 *value = buf; 1404 return true; 1405 } 1406 } else if (in == "stats") { 1407 char buf[200]; 1408 snprintf(buf, sizeof(buf), 1409 " Compactions\n" 1410 "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" 1411 "--------------------------------------------------\n"); 1412 value->append(buf); 1413 for (int level = 0; level < config::kNumLevels; level++) { 1414 int files = versions_->NumLevelFiles(level); 1415 if (stats_[level].micros > 0 || files > 0) { 1416 snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level, 1417 files, versions_->NumLevelBytes(level) / 1048576.0, 1418 stats_[level].micros / 1e6, 1419 stats_[level].bytes_read / 1048576.0, 1420 stats_[level].bytes_written / 1048576.0); 1421 value->append(buf); 1422 } 1423 } 1424 return true; 1425 } else if (in == "sstables") { 1426 *value = versions_->current()->DebugString(); 1427 return true; 1428 } else if (in == "approximate-memory-usage") { 1429 size_t total_usage = options_.block_cache->TotalCharge(); 1430 if (mem_) { 1431 total_usage += mem_->ApproximateMemoryUsage(); 1432 } 1433 if (imm_) { 1434 total_usage += imm_->ApproximateMemoryUsage(); 1435 } 1436 char buf[50]; 1437 snprintf(buf, sizeof(buf), "%llu", 1438 static_cast<unsigned long long>(total_usage)); 1439 value->append(buf); 1440 return true; 1441 } 1442 1443 return false; 1444 } 1445 1446 void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { 1447 // TODO(opt): better implementation 1448 MutexLock l(&mutex_); 1449 Version* v = versions_->current(); 1450 v->Ref(); 1451 1452 for (int i = 0; i < n; i++) { 1453 // Convert user_key into a corresponding internal key. 1454 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); 1455 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); 1456 uint64_t start = versions_->ApproximateOffsetOf(v, k1); 1457 uint64_t limit = versions_->ApproximateOffsetOf(v, k2); 1458 sizes[i] = (limit >= start ? limit - start : 0); 1459 } 1460 1461 v->Unref(); 1462 } 1463 1464 // Default implementations of convenience methods that subclasses of DB 1465 // can call if they wish 1466 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { 1467 WriteBatch batch; 1468 batch.Put(key, value); 1469 return Write(opt, &batch); 1470 } 1471 1472 Status DB::Delete(const WriteOptions& opt, const Slice& key) { 1473 WriteBatch batch; 1474 batch.Delete(key); 1475 return Write(opt, &batch); 1476 } 1477 1478 DB::~DB() = default; 1479 1480 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { 1481 *dbptr = nullptr; 1482 1483 DBImpl* impl = new DBImpl(options, dbname); 1484 impl->mutex_.Lock(); 1485 VersionEdit edit; 1486 // Recover handles create_if_missing, error_if_exists 1487 bool save_manifest = false; 1488 Status s = impl->Recover(&edit, &save_manifest); 1489 if (s.ok() && impl->mem_ == nullptr) { 1490 // Create new log and a corresponding memtable. 1491 uint64_t new_log_number = impl->versions_->NewFileNumber(); 1492 WritableFile* lfile; 1493 s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), 1494 &lfile); 1495 if (s.ok()) { 1496 edit.SetLogNumber(new_log_number); 1497 impl->logfile_ = lfile; 1498 impl->logfile_number_ = new_log_number; 1499 impl->log_ = new log::Writer(lfile); 1500 impl->mem_ = new MemTable(impl->internal_comparator_); 1501 impl->mem_->Ref(); 1502 } 1503 } 1504 if (s.ok() && save_manifest) { 1505 edit.SetPrevLogNumber(0); // No older logs needed after recovery. 1506 edit.SetLogNumber(impl->logfile_number_); 1507 s = impl->versions_->LogAndApply(&edit, &impl->mutex_); 1508 } 1509 if (s.ok()) { 1510 impl->DeleteObsoleteFiles(); 1511 impl->MaybeScheduleCompaction(); 1512 } 1513 impl->mutex_.Unlock(); 1514 if (s.ok()) { 1515 assert(impl->mem_ != nullptr); 1516 *dbptr = impl; 1517 } else { 1518 delete impl; 1519 } 1520 return s; 1521 } 1522 1523 Snapshot::~Snapshot() = default; 1524 1525 Status DestroyDB(const std::string& dbname, const Options& options) { 1526 Env* env = options.env; 1527 std::vector<std::string> filenames; 1528 Status result = env->GetChildren(dbname, &filenames); 1529 if (!result.ok()) { 1530 // Ignore error in case directory does not exist 1531 return Status::OK(); 1532 } 1533 1534 FileLock* lock; 1535 const std::string lockname = LockFileName(dbname); 1536 result = env->LockFile(lockname, &lock); 1537 if (result.ok()) { 1538 uint64_t number; 1539 FileType type; 1540 for (size_t i = 0; i < filenames.size(); i++) { 1541 if (ParseFileName(filenames[i], &number, &type) && 1542 type != kDBLockFile) { // Lock file will be deleted at end 1543 Status del = env->DeleteFile(dbname + "/" + filenames[i]); 1544 if (result.ok() && !del.ok()) { 1545 result = del; 1546 } 1547 } 1548 } 1549 env->UnlockFile(lock); // Ignore error since state is already gone 1550 env->DeleteFile(lockname); 1551 env->DeleteDir(dbname); // Ignore error in case dir contains other files 1552 } 1553 return result; 1554 } 1555 1556 } // namespace leveldb