/ src / leveldb / db / db_impl.cc
db_impl.cc
   1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style license that can be
   3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
   4  
   5  #include "db/db_impl.h"
   6  
   7  #include <stdint.h>
   8  #include <stdio.h>
   9  
  10  #include <algorithm>
  11  #include <atomic>
  12  #include <set>
  13  #include <string>
  14  #include <vector>
  15  
  16  #include "db/builder.h"
  17  #include "db/db_iter.h"
  18  #include "db/dbformat.h"
  19  #include "db/filename.h"
  20  #include "db/log_reader.h"
  21  #include "db/log_writer.h"
  22  #include "db/memtable.h"
  23  #include "db/table_cache.h"
  24  #include "db/version_set.h"
  25  #include "db/write_batch_internal.h"
  26  #include "leveldb/db.h"
  27  #include "leveldb/env.h"
  28  #include "leveldb/status.h"
  29  #include "leveldb/table.h"
  30  #include "leveldb/table_builder.h"
  31  #include "port/port.h"
  32  #include "table/block.h"
  33  #include "table/merger.h"
  34  #include "table/two_level_iterator.h"
  35  #include "util/coding.h"
  36  #include "util/logging.h"
  37  #include "util/mutexlock.h"
  38  
  39  namespace leveldb {
  40  
  41  const int kNumNonTableCacheFiles = 10;
  42  
  43  // Information kept for every waiting writer
  44  struct DBImpl::Writer {
  45    explicit Writer(port::Mutex* mu)
  46        : batch(nullptr), sync(false), done(false), cv(mu) {}
  47  
  48    Status status;
  49    WriteBatch* batch;
  50    bool sync;
  51    bool done;
  52    port::CondVar cv;
  53  };
  54  
  55  struct DBImpl::CompactionState {
  56    // Files produced by compaction
  57    struct Output {
  58      uint64_t number;
  59      uint64_t file_size;
  60      InternalKey smallest, largest;
  61    };
  62  
  63    Output* current_output() { return &outputs[outputs.size() - 1]; }
  64  
  65    explicit CompactionState(Compaction* c)
  66        : compaction(c),
  67          smallest_snapshot(0),
  68          outfile(nullptr),
  69          builder(nullptr),
  70          total_bytes(0) {}
  71  
  72    Compaction* const compaction;
  73  
  74    // Sequence numbers < smallest_snapshot are not significant since we
  75    // will never have to service a snapshot below smallest_snapshot.
  76    // Therefore if we have seen a sequence number S <= smallest_snapshot,
  77    // we can drop all entries for the same key with sequence numbers < S.
  78    SequenceNumber smallest_snapshot;
  79  
  80    std::vector<Output> outputs;
  81  
  82    // State kept for output being generated
  83    WritableFile* outfile;
  84    TableBuilder* builder;
  85  
  86    uint64_t total_bytes;
  87  };
  88  
  89  // Fix user-supplied options to be reasonable
  90  template <class T, class V>
  91  static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  92    if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  93    if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  94  }
  95  Options SanitizeOptions(const std::string& dbname,
  96                          const InternalKeyComparator* icmp,
  97                          const InternalFilterPolicy* ipolicy,
  98                          const Options& src) {
  99    Options result = src;
 100    result.comparator = icmp;
 101    result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
 102    ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
 103    ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
 104    ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
 105    ClipToRange(&result.block_size, 1 << 10, 4 << 20);
 106    if (result.info_log == nullptr) {
 107      // Open a log file in the same directory as the db
 108      src.env->CreateDir(dbname);  // In case it does not exist
 109      src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
 110      Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
 111      if (!s.ok()) {
 112        // No place suitable for logging
 113        result.info_log = nullptr;
 114      }
 115    }
 116    if (result.block_cache == nullptr) {
 117      result.block_cache = NewLRUCache(8 << 20);
 118    }
 119    return result;
 120  }
 121  
 122  static int TableCacheSize(const Options& sanitized_options) {
 123    // Reserve ten files or so for other uses and give the rest to TableCache.
 124    return sanitized_options.max_open_files - kNumNonTableCacheFiles;
 125  }
 126  
 127  DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
 128      : env_(raw_options.env),
 129        internal_comparator_(raw_options.comparator),
 130        internal_filter_policy_(raw_options.filter_policy),
 131        options_(SanitizeOptions(dbname, &internal_comparator_,
 132                                 &internal_filter_policy_, raw_options)),
 133        owns_info_log_(options_.info_log != raw_options.info_log),
 134        owns_cache_(options_.block_cache != raw_options.block_cache),
 135        dbname_(dbname),
 136        table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
 137        db_lock_(nullptr),
 138        shutting_down_(false),
 139        background_work_finished_signal_(&mutex_),
 140        mem_(nullptr),
 141        imm_(nullptr),
 142        has_imm_(false),
 143        logfile_(nullptr),
 144        logfile_number_(0),
 145        log_(nullptr),
 146        seed_(0),
 147        tmp_batch_(new WriteBatch),
 148        background_compaction_scheduled_(false),
 149        manual_compaction_(nullptr),
 150        versions_(new VersionSet(dbname_, &options_, table_cache_,
 151                                 &internal_comparator_)) {}
 152  
 153  DBImpl::~DBImpl() {
 154    // Wait for background work to finish.
 155    mutex_.Lock();
 156    shutting_down_.store(true, std::memory_order_release);
 157    while (background_compaction_scheduled_) {
 158      background_work_finished_signal_.Wait();
 159    }
 160    mutex_.Unlock();
 161  
 162    if (db_lock_ != nullptr) {
 163      env_->UnlockFile(db_lock_);
 164    }
 165  
 166    delete versions_;
 167    if (mem_ != nullptr) mem_->Unref();
 168    if (imm_ != nullptr) imm_->Unref();
 169    delete tmp_batch_;
 170    delete log_;
 171    delete logfile_;
 172    delete table_cache_;
 173  
 174    if (owns_info_log_) {
 175      delete options_.info_log;
 176    }
 177    if (owns_cache_) {
 178      delete options_.block_cache;
 179    }
 180  }
 181  
 182  Status DBImpl::NewDB() {
 183    VersionEdit new_db;
 184    new_db.SetComparatorName(user_comparator()->Name());
 185    new_db.SetLogNumber(0);
 186    new_db.SetNextFile(2);
 187    new_db.SetLastSequence(0);
 188  
 189    const std::string manifest = DescriptorFileName(dbname_, 1);
 190    WritableFile* file;
 191    Status s = env_->NewWritableFile(manifest, &file);
 192    if (!s.ok()) {
 193      return s;
 194    }
 195    {
 196      log::Writer log(file);
 197      std::string record;
 198      new_db.EncodeTo(&record);
 199      s = log.AddRecord(record);
 200      if (s.ok()) {
 201        s = file->Close();
 202      }
 203    }
 204    delete file;
 205    if (s.ok()) {
 206      // Make "CURRENT" file that points to the new manifest file.
 207      s = SetCurrentFile(env_, dbname_, 1);
 208    } else {
 209      env_->DeleteFile(manifest);
 210    }
 211    return s;
 212  }
 213  
 214  void DBImpl::MaybeIgnoreError(Status* s) const {
 215    if (s->ok() || options_.paranoid_checks) {
 216      // No change needed
 217    } else {
 218      Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
 219      *s = Status::OK();
 220    }
 221  }
 222  
 223  void DBImpl::DeleteObsoleteFiles() {
 224    mutex_.AssertHeld();
 225  
 226    if (!bg_error_.ok()) {
 227      // After a background error, we don't know whether a new version may
 228      // or may not have been committed, so we cannot safely garbage collect.
 229      return;
 230    }
 231  
 232    // Make a set of all of the live files
 233    std::set<uint64_t> live = pending_outputs_;
 234    versions_->AddLiveFiles(&live);
 235  
 236    std::vector<std::string> filenames;
 237    env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
 238    uint64_t number;
 239    FileType type;
 240    std::vector<std::string> files_to_delete;
 241    for (std::string& filename : filenames) {
 242      if (ParseFileName(filename, &number, &type)) {
 243        bool keep = true;
 244        switch (type) {
 245          case kLogFile:
 246            keep = ((number >= versions_->LogNumber()) ||
 247                    (number == versions_->PrevLogNumber()));
 248            break;
 249          case kDescriptorFile:
 250            // Keep my manifest file, and any newer incarnations'
 251            // (in case there is a race that allows other incarnations)
 252            keep = (number >= versions_->ManifestFileNumber());
 253            break;
 254          case kTableFile:
 255            keep = (live.find(number) != live.end());
 256            break;
 257          case kTempFile:
 258            // Any temp files that are currently being written to must
 259            // be recorded in pending_outputs_, which is inserted into "live"
 260            keep = (live.find(number) != live.end());
 261            break;
 262          case kCurrentFile:
 263          case kDBLockFile:
 264          case kInfoLogFile:
 265            keep = true;
 266            break;
 267        }
 268  
 269        if (!keep) {
 270          files_to_delete.push_back(std::move(filename));
 271          if (type == kTableFile) {
 272            table_cache_->Evict(number);
 273          }
 274          Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
 275              static_cast<unsigned long long>(number));
 276        }
 277      }
 278    }
 279  
 280    // While deleting all files unblock other threads. All files being deleted
 281    // have unique names which will not collide with newly created files and
 282    // are therefore safe to delete while allowing other threads to proceed.
 283    mutex_.Unlock();
 284    for (const std::string& filename : files_to_delete) {
 285      env_->DeleteFile(dbname_ + "/" + filename);
 286    }
 287    mutex_.Lock();
 288  }
 289  
 290  Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
 291    mutex_.AssertHeld();
 292  
 293    // Ignore error from CreateDir since the creation of the DB is
 294    // committed only when the descriptor is created, and this directory
 295    // may already exist from a previous failed creation attempt.
 296    env_->CreateDir(dbname_);
 297    assert(db_lock_ == nullptr);
 298    Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
 299    if (!s.ok()) {
 300      return s;
 301    }
 302  
 303    if (!env_->FileExists(CurrentFileName(dbname_))) {
 304      if (options_.create_if_missing) {
 305        s = NewDB();
 306        if (!s.ok()) {
 307          return s;
 308        }
 309      } else {
 310        return Status::InvalidArgument(
 311            dbname_, "does not exist (create_if_missing is false)");
 312      }
 313    } else {
 314      if (options_.error_if_exists) {
 315        return Status::InvalidArgument(dbname_,
 316                                       "exists (error_if_exists is true)");
 317      }
 318    }
 319  
 320    s = versions_->Recover(save_manifest);
 321    if (!s.ok()) {
 322      return s;
 323    }
 324    SequenceNumber max_sequence(0);
 325  
 326    // Recover from all newer log files than the ones named in the
 327    // descriptor (new log files may have been added by the previous
 328    // incarnation without registering them in the descriptor).
 329    //
 330    // Note that PrevLogNumber() is no longer used, but we pay
 331    // attention to it in case we are recovering a database
 332    // produced by an older version of leveldb.
 333    const uint64_t min_log = versions_->LogNumber();
 334    const uint64_t prev_log = versions_->PrevLogNumber();
 335    std::vector<std::string> filenames;
 336    s = env_->GetChildren(dbname_, &filenames);
 337    if (!s.ok()) {
 338      return s;
 339    }
 340    std::set<uint64_t> expected;
 341    versions_->AddLiveFiles(&expected);
 342    uint64_t number;
 343    FileType type;
 344    std::vector<uint64_t> logs;
 345    for (size_t i = 0; i < filenames.size(); i++) {
 346      if (ParseFileName(filenames[i], &number, &type)) {
 347        expected.erase(number);
 348        if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
 349          logs.push_back(number);
 350      }
 351    }
 352    if (!expected.empty()) {
 353      char buf[50];
 354      snprintf(buf, sizeof(buf), "%d missing files; e.g.",
 355               static_cast<int>(expected.size()));
 356      return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
 357    }
 358  
 359    // Recover in the order in which the logs were generated
 360    std::sort(logs.begin(), logs.end());
 361    for (size_t i = 0; i < logs.size(); i++) {
 362      s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
 363                         &max_sequence);
 364      if (!s.ok()) {
 365        return s;
 366      }
 367  
 368      // The previous incarnation may not have written any MANIFEST
 369      // records after allocating this log number.  So we manually
 370      // update the file number allocation counter in VersionSet.
 371      versions_->MarkFileNumberUsed(logs[i]);
 372    }
 373  
 374    if (versions_->LastSequence() < max_sequence) {
 375      versions_->SetLastSequence(max_sequence);
 376    }
 377  
 378    return Status::OK();
 379  }
 380  
 381  Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
 382                                bool* save_manifest, VersionEdit* edit,
 383                                SequenceNumber* max_sequence) {
 384    struct LogReporter : public log::Reader::Reporter {
 385      Env* env;
 386      Logger* info_log;
 387      const char* fname;
 388      Status* status;  // null if options_.paranoid_checks==false
 389      void Corruption(size_t bytes, const Status& s) override {
 390        Log(info_log, "%s%s: dropping %d bytes; %s",
 391            (this->status == nullptr ? "(ignoring error) " : ""), fname,
 392            static_cast<int>(bytes), s.ToString().c_str());
 393        if (this->status != nullptr && this->status->ok()) *this->status = s;
 394      }
 395    };
 396  
 397    mutex_.AssertHeld();
 398  
 399    // Open the log file
 400    std::string fname = LogFileName(dbname_, log_number);
 401    SequentialFile* file;
 402    Status status = env_->NewSequentialFile(fname, &file);
 403    if (!status.ok()) {
 404      MaybeIgnoreError(&status);
 405      return status;
 406    }
 407  
 408    // Create the log reader.
 409    LogReporter reporter;
 410    reporter.env = env_;
 411    reporter.info_log = options_.info_log;
 412    reporter.fname = fname.c_str();
 413    reporter.status = (options_.paranoid_checks ? &status : nullptr);
 414    // We intentionally make log::Reader do checksumming even if
 415    // paranoid_checks==false so that corruptions cause entire commits
 416    // to be skipped instead of propagating bad information (like overly
 417    // large sequence numbers).
 418    log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
 419    Log(options_.info_log, "Recovering log #%llu",
 420        (unsigned long long)log_number);
 421  
 422    // Read all the records and add to a memtable
 423    std::string scratch;
 424    Slice record;
 425    WriteBatch batch;
 426    int compactions = 0;
 427    MemTable* mem = nullptr;
 428    while (reader.ReadRecord(&record, &scratch) && status.ok()) {
 429      if (record.size() < 12) {
 430        reporter.Corruption(record.size(),
 431                            Status::Corruption("log record too small", fname));
 432        continue;
 433      }
 434      WriteBatchInternal::SetContents(&batch, record);
 435  
 436      if (mem == nullptr) {
 437        mem = new MemTable(internal_comparator_);
 438        mem->Ref();
 439      }
 440      status = WriteBatchInternal::InsertInto(&batch, mem);
 441      MaybeIgnoreError(&status);
 442      if (!status.ok()) {
 443        break;
 444      }
 445      const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
 446                                      WriteBatchInternal::Count(&batch) - 1;
 447      if (last_seq > *max_sequence) {
 448        *max_sequence = last_seq;
 449      }
 450  
 451      if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
 452        compactions++;
 453        *save_manifest = true;
 454        status = WriteLevel0Table(mem, edit, nullptr);
 455        mem->Unref();
 456        mem = nullptr;
 457        if (!status.ok()) {
 458          // Reflect errors immediately so that conditions like full
 459          // file-systems cause the DB::Open() to fail.
 460          break;
 461        }
 462      }
 463    }
 464  
 465    delete file;
 466  
 467    // See if we should keep reusing the last log file.
 468    if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
 469      assert(logfile_ == nullptr);
 470      assert(log_ == nullptr);
 471      assert(mem_ == nullptr);
 472      uint64_t lfile_size;
 473      if (env_->GetFileSize(fname, &lfile_size).ok() &&
 474          env_->NewAppendableFile(fname, &logfile_).ok()) {
 475        Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
 476        log_ = new log::Writer(logfile_, lfile_size);
 477        logfile_number_ = log_number;
 478        if (mem != nullptr) {
 479          mem_ = mem;
 480          mem = nullptr;
 481        } else {
 482          // mem can be nullptr if lognum exists but was empty.
 483          mem_ = new MemTable(internal_comparator_);
 484          mem_->Ref();
 485        }
 486      }
 487    }
 488  
 489    if (mem != nullptr) {
 490      // mem did not get reused; compact it.
 491      if (status.ok()) {
 492        *save_manifest = true;
 493        status = WriteLevel0Table(mem, edit, nullptr);
 494      }
 495      mem->Unref();
 496    }
 497  
 498    return status;
 499  }
 500  
 501  Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
 502                                  Version* base) {
 503    mutex_.AssertHeld();
 504    const uint64_t start_micros = env_->NowMicros();
 505    FileMetaData meta;
 506    meta.number = versions_->NewFileNumber();
 507    pending_outputs_.insert(meta.number);
 508    Iterator* iter = mem->NewIterator();
 509    Log(options_.info_log, "Level-0 table #%llu: started",
 510        (unsigned long long)meta.number);
 511  
 512    Status s;
 513    {
 514      mutex_.Unlock();
 515      s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
 516      mutex_.Lock();
 517    }
 518  
 519    Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
 520        (unsigned long long)meta.number, (unsigned long long)meta.file_size,
 521        s.ToString().c_str());
 522    delete iter;
 523    pending_outputs_.erase(meta.number);
 524  
 525    // Note that if file_size is zero, the file has been deleted and
 526    // should not be added to the manifest.
 527    int level = 0;
 528    if (s.ok() && meta.file_size > 0) {
 529      const Slice min_user_key = meta.smallest.user_key();
 530      const Slice max_user_key = meta.largest.user_key();
 531      if (base != nullptr) {
 532        level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
 533      }
 534      edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
 535                    meta.largest);
 536    }
 537  
 538    CompactionStats stats;
 539    stats.micros = env_->NowMicros() - start_micros;
 540    stats.bytes_written = meta.file_size;
 541    stats_[level].Add(stats);
 542    return s;
 543  }
 544  
 545  void DBImpl::CompactMemTable() {
 546    mutex_.AssertHeld();
 547    assert(imm_ != nullptr);
 548  
 549    // Save the contents of the memtable as a new Table
 550    VersionEdit edit;
 551    Version* base = versions_->current();
 552    base->Ref();
 553    Status s = WriteLevel0Table(imm_, &edit, base);
 554    base->Unref();
 555  
 556    if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
 557      s = Status::IOError("Deleting DB during memtable compaction");
 558    }
 559  
 560    // Replace immutable memtable with the generated Table
 561    if (s.ok()) {
 562      edit.SetPrevLogNumber(0);
 563      edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
 564      s = versions_->LogAndApply(&edit, &mutex_);
 565    }
 566  
 567    if (s.ok()) {
 568      // Commit to the new state
 569      imm_->Unref();
 570      imm_ = nullptr;
 571      has_imm_.store(false, std::memory_order_release);
 572      DeleteObsoleteFiles();
 573    } else {
 574      RecordBackgroundError(s);
 575    }
 576  }
 577  
 578  void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
 579    int max_level_with_files = 1;
 580    {
 581      MutexLock l(&mutex_);
 582      Version* base = versions_->current();
 583      for (int level = 1; level < config::kNumLevels; level++) {
 584        if (base->OverlapInLevel(level, begin, end)) {
 585          max_level_with_files = level;
 586        }
 587      }
 588    }
 589    TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
 590    for (int level = 0; level < max_level_with_files; level++) {
 591      TEST_CompactRange(level, begin, end);
 592    }
 593  }
 594  
 595  void DBImpl::TEST_CompactRange(int level, const Slice* begin,
 596                                 const Slice* end) {
 597    assert(level >= 0);
 598    assert(level + 1 < config::kNumLevels);
 599  
 600    InternalKey begin_storage, end_storage;
 601  
 602    ManualCompaction manual;
 603    manual.level = level;
 604    manual.done = false;
 605    if (begin == nullptr) {
 606      manual.begin = nullptr;
 607    } else {
 608      begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
 609      manual.begin = &begin_storage;
 610    }
 611    if (end == nullptr) {
 612      manual.end = nullptr;
 613    } else {
 614      end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
 615      manual.end = &end_storage;
 616    }
 617  
 618    MutexLock l(&mutex_);
 619    while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
 620           bg_error_.ok()) {
 621      if (manual_compaction_ == nullptr) {  // Idle
 622        manual_compaction_ = &manual;
 623        MaybeScheduleCompaction();
 624      } else {  // Running either my compaction or another compaction.
 625        background_work_finished_signal_.Wait();
 626      }
 627    }
 628    if (manual_compaction_ == &manual) {
 629      // Cancel my manual compaction since we aborted early for some reason.
 630      manual_compaction_ = nullptr;
 631    }
 632  }
 633  
 634  Status DBImpl::TEST_CompactMemTable() {
 635    // nullptr batch means just wait for earlier writes to be done
 636    Status s = Write(WriteOptions(), nullptr);
 637    if (s.ok()) {
 638      // Wait until the compaction completes
 639      MutexLock l(&mutex_);
 640      while (imm_ != nullptr && bg_error_.ok()) {
 641        background_work_finished_signal_.Wait();
 642      }
 643      if (imm_ != nullptr) {
 644        s = bg_error_;
 645      }
 646    }
 647    return s;
 648  }
 649  
 650  void DBImpl::RecordBackgroundError(const Status& s) {
 651    mutex_.AssertHeld();
 652    if (bg_error_.ok()) {
 653      bg_error_ = s;
 654      background_work_finished_signal_.SignalAll();
 655    }
 656  }
 657  
 658  void DBImpl::MaybeScheduleCompaction() {
 659    mutex_.AssertHeld();
 660    if (background_compaction_scheduled_) {
 661      // Already scheduled
 662    } else if (shutting_down_.load(std::memory_order_acquire)) {
 663      // DB is being deleted; no more background compactions
 664    } else if (!bg_error_.ok()) {
 665      // Already got an error; no more changes
 666    } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
 667               !versions_->NeedsCompaction()) {
 668      // No work to be done
 669    } else {
 670      background_compaction_scheduled_ = true;
 671      env_->Schedule(&DBImpl::BGWork, this);
 672    }
 673  }
 674  
 675  void DBImpl::BGWork(void* db) {
 676    reinterpret_cast<DBImpl*>(db)->BackgroundCall();
 677  }
 678  
 679  void DBImpl::BackgroundCall() {
 680    MutexLock l(&mutex_);
 681    assert(background_compaction_scheduled_);
 682    if (shutting_down_.load(std::memory_order_acquire)) {
 683      // No more background work when shutting down.
 684    } else if (!bg_error_.ok()) {
 685      // No more background work after a background error.
 686    } else {
 687      BackgroundCompaction();
 688    }
 689  
 690    background_compaction_scheduled_ = false;
 691  
 692    // Previous compaction may have produced too many files in a level,
 693    // so reschedule another compaction if needed.
 694    MaybeScheduleCompaction();
 695    background_work_finished_signal_.SignalAll();
 696  }
 697  
 698  void DBImpl::BackgroundCompaction() {
 699    mutex_.AssertHeld();
 700  
 701    if (imm_ != nullptr) {
 702      CompactMemTable();
 703      return;
 704    }
 705  
 706    Compaction* c;
 707    bool is_manual = (manual_compaction_ != nullptr);
 708    InternalKey manual_end;
 709    if (is_manual) {
 710      ManualCompaction* m = manual_compaction_;
 711      c = versions_->CompactRange(m->level, m->begin, m->end);
 712      m->done = (c == nullptr);
 713      if (c != nullptr) {
 714        manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
 715      }
 716      Log(options_.info_log,
 717          "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
 718          m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
 719          (m->end ? m->end->DebugString().c_str() : "(end)"),
 720          (m->done ? "(end)" : manual_end.DebugString().c_str()));
 721    } else {
 722      c = versions_->PickCompaction();
 723    }
 724  
 725    Status status;
 726    if (c == nullptr) {
 727      // Nothing to do
 728    } else if (!is_manual && c->IsTrivialMove()) {
 729      // Move file to next level
 730      assert(c->num_input_files(0) == 1);
 731      FileMetaData* f = c->input(0, 0);
 732      c->edit()->DeleteFile(c->level(), f->number);
 733      c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
 734                         f->largest);
 735      status = versions_->LogAndApply(c->edit(), &mutex_);
 736      if (!status.ok()) {
 737        RecordBackgroundError(status);
 738      }
 739      VersionSet::LevelSummaryStorage tmp;
 740      Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
 741          static_cast<unsigned long long>(f->number), c->level() + 1,
 742          static_cast<unsigned long long>(f->file_size),
 743          status.ToString().c_str(), versions_->LevelSummary(&tmp));
 744    } else {
 745      CompactionState* compact = new CompactionState(c);
 746      status = DoCompactionWork(compact);
 747      if (!status.ok()) {
 748        RecordBackgroundError(status);
 749      }
 750      CleanupCompaction(compact);
 751      c->ReleaseInputs();
 752      DeleteObsoleteFiles();
 753    }
 754    delete c;
 755  
 756    if (status.ok()) {
 757      // Done
 758    } else if (shutting_down_.load(std::memory_order_acquire)) {
 759      // Ignore compaction errors found during shutting down
 760    } else {
 761      Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
 762    }
 763  
 764    if (is_manual) {
 765      ManualCompaction* m = manual_compaction_;
 766      if (!status.ok()) {
 767        m->done = true;
 768      }
 769      if (!m->done) {
 770        // We only compacted part of the requested range.  Update *m
 771        // to the range that is left to be compacted.
 772        m->tmp_storage = manual_end;
 773        m->begin = &m->tmp_storage;
 774      }
 775      manual_compaction_ = nullptr;
 776    }
 777  }
 778  
 779  void DBImpl::CleanupCompaction(CompactionState* compact) {
 780    mutex_.AssertHeld();
 781    if (compact->builder != nullptr) {
 782      // May happen if we get a shutdown call in the middle of compaction
 783      compact->builder->Abandon();
 784      delete compact->builder;
 785    } else {
 786      assert(compact->outfile == nullptr);
 787    }
 788    delete compact->outfile;
 789    for (size_t i = 0; i < compact->outputs.size(); i++) {
 790      const CompactionState::Output& out = compact->outputs[i];
 791      pending_outputs_.erase(out.number);
 792    }
 793    delete compact;
 794  }
 795  
 796  Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
 797    assert(compact != nullptr);
 798    assert(compact->builder == nullptr);
 799    uint64_t file_number;
 800    {
 801      mutex_.Lock();
 802      file_number = versions_->NewFileNumber();
 803      pending_outputs_.insert(file_number);
 804      CompactionState::Output out;
 805      out.number = file_number;
 806      out.file_size = 0;
 807      out.smallest.Clear();
 808      out.largest.Clear();
 809      compact->outputs.push_back(out);
 810      mutex_.Unlock();
 811    }
 812  
 813    // Make the output file
 814    std::string fname = TableFileName(dbname_, file_number);
 815    Status s = env_->NewWritableFile(fname, &compact->outfile);
 816    if (s.ok()) {
 817      compact->builder = new TableBuilder(options_, compact->outfile);
 818    }
 819    return s;
 820  }
 821  
 822  Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
 823                                            Iterator* input) {
 824    assert(compact != nullptr);
 825    assert(compact->outfile != nullptr);
 826    assert(compact->builder != nullptr);
 827  
 828    const uint64_t output_number = compact->current_output()->number;
 829    assert(output_number != 0);
 830  
 831    // Check for iterator errors
 832    Status s = input->status();
 833    const uint64_t current_entries = compact->builder->NumEntries();
 834    if (s.ok()) {
 835      s = compact->builder->Finish();
 836    } else {
 837      compact->builder->Abandon();
 838    }
 839    const uint64_t current_bytes = compact->builder->FileSize();
 840    compact->current_output()->file_size = current_bytes;
 841    compact->total_bytes += current_bytes;
 842    delete compact->builder;
 843    compact->builder = nullptr;
 844  
 845    // Finish and check for file errors
 846    if (s.ok()) {
 847      s = compact->outfile->Sync();
 848    }
 849    if (s.ok()) {
 850      s = compact->outfile->Close();
 851    }
 852    delete compact->outfile;
 853    compact->outfile = nullptr;
 854  
 855    if (s.ok() && current_entries > 0) {
 856      // Verify that the table is usable
 857      Iterator* iter =
 858          table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
 859      s = iter->status();
 860      delete iter;
 861      if (s.ok()) {
 862        Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
 863            (unsigned long long)output_number, compact->compaction->level(),
 864            (unsigned long long)current_entries,
 865            (unsigned long long)current_bytes);
 866      }
 867    }
 868    return s;
 869  }
 870  
 871  Status DBImpl::InstallCompactionResults(CompactionState* compact) {
 872    mutex_.AssertHeld();
 873    Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
 874        compact->compaction->num_input_files(0), compact->compaction->level(),
 875        compact->compaction->num_input_files(1), compact->compaction->level() + 1,
 876        static_cast<long long>(compact->total_bytes));
 877  
 878    // Add compaction outputs
 879    compact->compaction->AddInputDeletions(compact->compaction->edit());
 880    const int level = compact->compaction->level();
 881    for (size_t i = 0; i < compact->outputs.size(); i++) {
 882      const CompactionState::Output& out = compact->outputs[i];
 883      compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
 884                                           out.smallest, out.largest);
 885    }
 886    return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
 887  }
 888  
 889  Status DBImpl::DoCompactionWork(CompactionState* compact) {
 890    const uint64_t start_micros = env_->NowMicros();
 891    int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
 892  
 893    Log(options_.info_log, "Compacting %d@%d + %d@%d files",
 894        compact->compaction->num_input_files(0), compact->compaction->level(),
 895        compact->compaction->num_input_files(1),
 896        compact->compaction->level() + 1);
 897  
 898    assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
 899    assert(compact->builder == nullptr);
 900    assert(compact->outfile == nullptr);
 901    if (snapshots_.empty()) {
 902      compact->smallest_snapshot = versions_->LastSequence();
 903    } else {
 904      compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
 905    }
 906  
 907    Iterator* input = versions_->MakeInputIterator(compact->compaction);
 908  
 909    // Release mutex while we're actually doing the compaction work
 910    mutex_.Unlock();
 911  
 912    input->SeekToFirst();
 913    Status status;
 914    ParsedInternalKey ikey;
 915    std::string current_user_key;
 916    bool has_current_user_key = false;
 917    SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
 918    while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
 919      // Prioritize immutable compaction work
 920      if (has_imm_.load(std::memory_order_relaxed)) {
 921        const uint64_t imm_start = env_->NowMicros();
 922        mutex_.Lock();
 923        if (imm_ != nullptr) {
 924          CompactMemTable();
 925          // Wake up MakeRoomForWrite() if necessary.
 926          background_work_finished_signal_.SignalAll();
 927        }
 928        mutex_.Unlock();
 929        imm_micros += (env_->NowMicros() - imm_start);
 930      }
 931  
 932      Slice key = input->key();
 933      if (compact->compaction->ShouldStopBefore(key) &&
 934          compact->builder != nullptr) {
 935        status = FinishCompactionOutputFile(compact, input);
 936        if (!status.ok()) {
 937          break;
 938        }
 939      }
 940  
 941      // Handle key/value, add to state, etc.
 942      bool drop = false;
 943      if (!ParseInternalKey(key, &ikey)) {
 944        // Do not hide error keys
 945        current_user_key.clear();
 946        has_current_user_key = false;
 947        last_sequence_for_key = kMaxSequenceNumber;
 948      } else {
 949        if (!has_current_user_key ||
 950            user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
 951                0) {
 952          // First occurrence of this user key
 953          current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
 954          has_current_user_key = true;
 955          last_sequence_for_key = kMaxSequenceNumber;
 956        }
 957  
 958        if (last_sequence_for_key <= compact->smallest_snapshot) {
 959          // Hidden by an newer entry for same user key
 960          drop = true;  // (A)
 961        } else if (ikey.type == kTypeDeletion &&
 962                   ikey.sequence <= compact->smallest_snapshot &&
 963                   compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
 964          // For this user key:
 965          // (1) there is no data in higher levels
 966          // (2) data in lower levels will have larger sequence numbers
 967          // (3) data in layers that are being compacted here and have
 968          //     smaller sequence numbers will be dropped in the next
 969          //     few iterations of this loop (by rule (A) above).
 970          // Therefore this deletion marker is obsolete and can be dropped.
 971          drop = true;
 972        }
 973  
 974        last_sequence_for_key = ikey.sequence;
 975      }
 976  #if 0
 977      Log(options_.info_log,
 978          "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
 979          "%d smallest_snapshot: %d",
 980          ikey.user_key.ToString().c_str(),
 981          (int)ikey.sequence, ikey.type, kTypeValue, drop,
 982          compact->compaction->IsBaseLevelForKey(ikey.user_key),
 983          (int)last_sequence_for_key, (int)compact->smallest_snapshot);
 984  #endif
 985  
 986      if (!drop) {
 987        // Open output file if necessary
 988        if (compact->builder == nullptr) {
 989          status = OpenCompactionOutputFile(compact);
 990          if (!status.ok()) {
 991            break;
 992          }
 993        }
 994        if (compact->builder->NumEntries() == 0) {
 995          compact->current_output()->smallest.DecodeFrom(key);
 996        }
 997        compact->current_output()->largest.DecodeFrom(key);
 998        compact->builder->Add(key, input->value());
 999  
1000        // Close output file if it is big enough
1001        if (compact->builder->FileSize() >=
1002            compact->compaction->MaxOutputFileSize()) {
1003          status = FinishCompactionOutputFile(compact, input);
1004          if (!status.ok()) {
1005            break;
1006          }
1007        }
1008      }
1009  
1010      input->Next();
1011    }
1012  
1013    if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
1014      status = Status::IOError("Deleting DB during compaction");
1015    }
1016    if (status.ok() && compact->builder != nullptr) {
1017      status = FinishCompactionOutputFile(compact, input);
1018    }
1019    if (status.ok()) {
1020      status = input->status();
1021    }
1022    delete input;
1023    input = nullptr;
1024  
1025    CompactionStats stats;
1026    stats.micros = env_->NowMicros() - start_micros - imm_micros;
1027    for (int which = 0; which < 2; which++) {
1028      for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1029        stats.bytes_read += compact->compaction->input(which, i)->file_size;
1030      }
1031    }
1032    for (size_t i = 0; i < compact->outputs.size(); i++) {
1033      stats.bytes_written += compact->outputs[i].file_size;
1034    }
1035  
1036    mutex_.Lock();
1037    stats_[compact->compaction->level() + 1].Add(stats);
1038  
1039    if (status.ok()) {
1040      status = InstallCompactionResults(compact);
1041    }
1042    if (!status.ok()) {
1043      RecordBackgroundError(status);
1044    }
1045    VersionSet::LevelSummaryStorage tmp;
1046    Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1047    return status;
1048  }
1049  
1050  namespace {
1051  
1052  struct IterState {
1053    port::Mutex* const mu;
1054    Version* const version GUARDED_BY(mu);
1055    MemTable* const mem GUARDED_BY(mu);
1056    MemTable* const imm GUARDED_BY(mu);
1057  
1058    IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
1059        : mu(mutex), version(version), mem(mem), imm(imm) {}
1060  };
1061  
1062  static void CleanupIteratorState(void* arg1, void* arg2) {
1063    IterState* state = reinterpret_cast<IterState*>(arg1);
1064    state->mu->Lock();
1065    state->mem->Unref();
1066    if (state->imm != nullptr) state->imm->Unref();
1067    state->version->Unref();
1068    state->mu->Unlock();
1069    delete state;
1070  }
1071  
1072  }  // anonymous namespace
1073  
1074  Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1075                                        SequenceNumber* latest_snapshot,
1076                                        uint32_t* seed) {
1077    mutex_.Lock();
1078    *latest_snapshot = versions_->LastSequence();
1079  
1080    // Collect together all needed child iterators
1081    std::vector<Iterator*> list;
1082    list.push_back(mem_->NewIterator());
1083    mem_->Ref();
1084    if (imm_ != nullptr) {
1085      list.push_back(imm_->NewIterator());
1086      imm_->Ref();
1087    }
1088    versions_->current()->AddIterators(options, &list);
1089    Iterator* internal_iter =
1090        NewMergingIterator(&internal_comparator_, &list[0], list.size());
1091    versions_->current()->Ref();
1092  
1093    IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1094    internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1095  
1096    *seed = ++seed_;
1097    mutex_.Unlock();
1098    return internal_iter;
1099  }
1100  
1101  Iterator* DBImpl::TEST_NewInternalIterator() {
1102    SequenceNumber ignored;
1103    uint32_t ignored_seed;
1104    return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1105  }
1106  
1107  int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1108    MutexLock l(&mutex_);
1109    return versions_->MaxNextLevelOverlappingBytes();
1110  }
1111  
1112  Status DBImpl::Get(const ReadOptions& options, const Slice& key,
1113                     std::string* value) {
1114    Status s;
1115    MutexLock l(&mutex_);
1116    SequenceNumber snapshot;
1117    if (options.snapshot != nullptr) {
1118      snapshot =
1119          static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1120    } else {
1121      snapshot = versions_->LastSequence();
1122    }
1123  
1124    MemTable* mem = mem_;
1125    MemTable* imm = imm_;
1126    Version* current = versions_->current();
1127    mem->Ref();
1128    if (imm != nullptr) imm->Ref();
1129    current->Ref();
1130  
1131    bool have_stat_update = false;
1132    Version::GetStats stats;
1133  
1134    // Unlock while reading from files and memtables
1135    {
1136      mutex_.Unlock();
1137      // First look in the memtable, then in the immutable memtable (if any).
1138      LookupKey lkey(key, snapshot);
1139      if (mem->Get(lkey, value, &s)) {
1140        // Done
1141      } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
1142        // Done
1143      } else {
1144        s = current->Get(options, lkey, value, &stats);
1145        have_stat_update = true;
1146      }
1147      mutex_.Lock();
1148    }
1149  
1150    if (have_stat_update && current->UpdateStats(stats)) {
1151      MaybeScheduleCompaction();
1152    }
1153    mem->Unref();
1154    if (imm != nullptr) imm->Unref();
1155    current->Unref();
1156    return s;
1157  }
1158  
1159  Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1160    SequenceNumber latest_snapshot;
1161    uint32_t seed;
1162    Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1163    return NewDBIterator(this, user_comparator(), iter,
1164                         (options.snapshot != nullptr
1165                              ? static_cast<const SnapshotImpl*>(options.snapshot)
1166                                    ->sequence_number()
1167                              : latest_snapshot),
1168                         seed);
1169  }
1170  
1171  void DBImpl::RecordReadSample(Slice key) {
1172    MutexLock l(&mutex_);
1173    if (versions_->current()->RecordReadSample(key)) {
1174      MaybeScheduleCompaction();
1175    }
1176  }
1177  
1178  const Snapshot* DBImpl::GetSnapshot() {
1179    MutexLock l(&mutex_);
1180    return snapshots_.New(versions_->LastSequence());
1181  }
1182  
1183  void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
1184    MutexLock l(&mutex_);
1185    snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1186  }
1187  
1188  // Convenience methods
1189  Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1190    return DB::Put(o, key, val);
1191  }
1192  
1193  Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1194    return DB::Delete(options, key);
1195  }
1196  
1197  Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1198    Writer w(&mutex_);
1199    w.batch = updates;
1200    w.sync = options.sync;
1201    w.done = false;
1202  
1203    MutexLock l(&mutex_);
1204    writers_.push_back(&w);
1205    while (!w.done && &w != writers_.front()) {
1206      w.cv.Wait();
1207    }
1208    if (w.done) {
1209      return w.status;
1210    }
1211  
1212    // May temporarily unlock and wait.
1213    Status status = MakeRoomForWrite(updates == nullptr);
1214    uint64_t last_sequence = versions_->LastSequence();
1215    Writer* last_writer = &w;
1216    if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
1217      WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1218      WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1219      last_sequence += WriteBatchInternal::Count(write_batch);
1220  
1221      // Add to log and apply to memtable.  We can release the lock
1222      // during this phase since &w is currently responsible for logging
1223      // and protects against concurrent loggers and concurrent writes
1224      // into mem_.
1225      {
1226        mutex_.Unlock();
1227        status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
1228        bool sync_error = false;
1229        if (status.ok() && options.sync) {
1230          status = logfile_->Sync();
1231          if (!status.ok()) {
1232            sync_error = true;
1233          }
1234        }
1235        if (status.ok()) {
1236          status = WriteBatchInternal::InsertInto(write_batch, mem_);
1237        }
1238        mutex_.Lock();
1239        if (sync_error) {
1240          // The state of the log file is indeterminate: the log record we
1241          // just added may or may not show up when the DB is re-opened.
1242          // So we force the DB into a mode where all future writes fail.
1243          RecordBackgroundError(status);
1244        }
1245      }
1246      if (write_batch == tmp_batch_) tmp_batch_->Clear();
1247  
1248      versions_->SetLastSequence(last_sequence);
1249    }
1250  
1251    while (true) {
1252      Writer* ready = writers_.front();
1253      writers_.pop_front();
1254      if (ready != &w) {
1255        ready->status = status;
1256        ready->done = true;
1257        ready->cv.Signal();
1258      }
1259      if (ready == last_writer) break;
1260    }
1261  
1262    // Notify new head of write queue
1263    if (!writers_.empty()) {
1264      writers_.front()->cv.Signal();
1265    }
1266  
1267    return status;
1268  }
1269  
1270  // REQUIRES: Writer list must be non-empty
1271  // REQUIRES: First writer must have a non-null batch
1272  WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1273    mutex_.AssertHeld();
1274    assert(!writers_.empty());
1275    Writer* first = writers_.front();
1276    WriteBatch* result = first->batch;
1277    assert(result != nullptr);
1278  
1279    size_t size = WriteBatchInternal::ByteSize(first->batch);
1280  
1281    // Allow the group to grow up to a maximum size, but if the
1282    // original write is small, limit the growth so we do not slow
1283    // down the small write too much.
1284    size_t max_size = 1 << 20;
1285    if (size <= (128 << 10)) {
1286      max_size = size + (128 << 10);
1287    }
1288  
1289    *last_writer = first;
1290    std::deque<Writer*>::iterator iter = writers_.begin();
1291    ++iter;  // Advance past "first"
1292    for (; iter != writers_.end(); ++iter) {
1293      Writer* w = *iter;
1294      if (w->sync && !first->sync) {
1295        // Do not include a sync write into a batch handled by a non-sync write.
1296        break;
1297      }
1298  
1299      if (w->batch != nullptr) {
1300        size += WriteBatchInternal::ByteSize(w->batch);
1301        if (size > max_size) {
1302          // Do not make batch too big
1303          break;
1304        }
1305  
1306        // Append to *result
1307        if (result == first->batch) {
1308          // Switch to temporary batch instead of disturbing caller's batch
1309          result = tmp_batch_;
1310          assert(WriteBatchInternal::Count(result) == 0);
1311          WriteBatchInternal::Append(result, first->batch);
1312        }
1313        WriteBatchInternal::Append(result, w->batch);
1314      }
1315      *last_writer = w;
1316    }
1317    return result;
1318  }
1319  
1320  // REQUIRES: mutex_ is held
1321  // REQUIRES: this thread is currently at the front of the writer queue
1322  Status DBImpl::MakeRoomForWrite(bool force) {
1323    mutex_.AssertHeld();
1324    assert(!writers_.empty());
1325    bool allow_delay = !force;
1326    Status s;
1327    while (true) {
1328      if (!bg_error_.ok()) {
1329        // Yield previous error
1330        s = bg_error_;
1331        break;
1332      } else if (allow_delay && versions_->NumLevelFiles(0) >=
1333                                    config::kL0_SlowdownWritesTrigger) {
1334        // We are getting close to hitting a hard limit on the number of
1335        // L0 files.  Rather than delaying a single write by several
1336        // seconds when we hit the hard limit, start delaying each
1337        // individual write by 1ms to reduce latency variance.  Also,
1338        // this delay hands over some CPU to the compaction thread in
1339        // case it is sharing the same core as the writer.
1340        mutex_.Unlock();
1341        env_->SleepForMicroseconds(1000);
1342        allow_delay = false;  // Do not delay a single write more than once
1343        mutex_.Lock();
1344      } else if (!force &&
1345                 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1346        // There is room in current memtable
1347        break;
1348      } else if (imm_ != nullptr) {
1349        // We have filled up the current memtable, but the previous
1350        // one is still being compacted, so we wait.
1351        Log(options_.info_log, "Current memtable full; waiting...\n");
1352        background_work_finished_signal_.Wait();
1353      } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1354        // There are too many level-0 files.
1355        Log(options_.info_log, "Too many L0 files; waiting...\n");
1356        background_work_finished_signal_.Wait();
1357      } else {
1358        // Attempt to switch to a new memtable and trigger compaction of old
1359        assert(versions_->PrevLogNumber() == 0);
1360        uint64_t new_log_number = versions_->NewFileNumber();
1361        WritableFile* lfile = nullptr;
1362        s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1363        if (!s.ok()) {
1364          // Avoid chewing through file number space in a tight loop.
1365          versions_->ReuseFileNumber(new_log_number);
1366          break;
1367        }
1368        delete log_;
1369        delete logfile_;
1370        logfile_ = lfile;
1371        logfile_number_ = new_log_number;
1372        log_ = new log::Writer(lfile);
1373        imm_ = mem_;
1374        has_imm_.store(true, std::memory_order_release);
1375        mem_ = new MemTable(internal_comparator_);
1376        mem_->Ref();
1377        force = false;  // Do not force another compaction if have room
1378        MaybeScheduleCompaction();
1379      }
1380    }
1381    return s;
1382  }
1383  
1384  bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1385    value->clear();
1386  
1387    MutexLock l(&mutex_);
1388    Slice in = property;
1389    Slice prefix("leveldb.");
1390    if (!in.starts_with(prefix)) return false;
1391    in.remove_prefix(prefix.size());
1392  
1393    if (in.starts_with("num-files-at-level")) {
1394      in.remove_prefix(strlen("num-files-at-level"));
1395      uint64_t level;
1396      bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1397      if (!ok || level >= config::kNumLevels) {
1398        return false;
1399      } else {
1400        char buf[100];
1401        snprintf(buf, sizeof(buf), "%d",
1402                 versions_->NumLevelFiles(static_cast<int>(level)));
1403        *value = buf;
1404        return true;
1405      }
1406    } else if (in == "stats") {
1407      char buf[200];
1408      snprintf(buf, sizeof(buf),
1409               "                               Compactions\n"
1410               "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1411               "--------------------------------------------------\n");
1412      value->append(buf);
1413      for (int level = 0; level < config::kNumLevels; level++) {
1414        int files = versions_->NumLevelFiles(level);
1415        if (stats_[level].micros > 0 || files > 0) {
1416          snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
1417                   files, versions_->NumLevelBytes(level) / 1048576.0,
1418                   stats_[level].micros / 1e6,
1419                   stats_[level].bytes_read / 1048576.0,
1420                   stats_[level].bytes_written / 1048576.0);
1421          value->append(buf);
1422        }
1423      }
1424      return true;
1425    } else if (in == "sstables") {
1426      *value = versions_->current()->DebugString();
1427      return true;
1428    } else if (in == "approximate-memory-usage") {
1429      size_t total_usage = options_.block_cache->TotalCharge();
1430      if (mem_) {
1431        total_usage += mem_->ApproximateMemoryUsage();
1432      }
1433      if (imm_) {
1434        total_usage += imm_->ApproximateMemoryUsage();
1435      }
1436      char buf[50];
1437      snprintf(buf, sizeof(buf), "%llu",
1438               static_cast<unsigned long long>(total_usage));
1439      value->append(buf);
1440      return true;
1441    }
1442  
1443    return false;
1444  }
1445  
1446  void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
1447    // TODO(opt): better implementation
1448    MutexLock l(&mutex_);
1449    Version* v = versions_->current();
1450    v->Ref();
1451  
1452    for (int i = 0; i < n; i++) {
1453      // Convert user_key into a corresponding internal key.
1454      InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1455      InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1456      uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1457      uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1458      sizes[i] = (limit >= start ? limit - start : 0);
1459    }
1460  
1461    v->Unref();
1462  }
1463  
1464  // Default implementations of convenience methods that subclasses of DB
1465  // can call if they wish
1466  Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1467    WriteBatch batch;
1468    batch.Put(key, value);
1469    return Write(opt, &batch);
1470  }
1471  
1472  Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1473    WriteBatch batch;
1474    batch.Delete(key);
1475    return Write(opt, &batch);
1476  }
1477  
1478  DB::~DB() = default;
1479  
1480  Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1481    *dbptr = nullptr;
1482  
1483    DBImpl* impl = new DBImpl(options, dbname);
1484    impl->mutex_.Lock();
1485    VersionEdit edit;
1486    // Recover handles create_if_missing, error_if_exists
1487    bool save_manifest = false;
1488    Status s = impl->Recover(&edit, &save_manifest);
1489    if (s.ok() && impl->mem_ == nullptr) {
1490      // Create new log and a corresponding memtable.
1491      uint64_t new_log_number = impl->versions_->NewFileNumber();
1492      WritableFile* lfile;
1493      s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1494                                       &lfile);
1495      if (s.ok()) {
1496        edit.SetLogNumber(new_log_number);
1497        impl->logfile_ = lfile;
1498        impl->logfile_number_ = new_log_number;
1499        impl->log_ = new log::Writer(lfile);
1500        impl->mem_ = new MemTable(impl->internal_comparator_);
1501        impl->mem_->Ref();
1502      }
1503    }
1504    if (s.ok() && save_manifest) {
1505      edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
1506      edit.SetLogNumber(impl->logfile_number_);
1507      s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1508    }
1509    if (s.ok()) {
1510      impl->DeleteObsoleteFiles();
1511      impl->MaybeScheduleCompaction();
1512    }
1513    impl->mutex_.Unlock();
1514    if (s.ok()) {
1515      assert(impl->mem_ != nullptr);
1516      *dbptr = impl;
1517    } else {
1518      delete impl;
1519    }
1520    return s;
1521  }
1522  
1523  Snapshot::~Snapshot() = default;
1524  
1525  Status DestroyDB(const std::string& dbname, const Options& options) {
1526    Env* env = options.env;
1527    std::vector<std::string> filenames;
1528    Status result = env->GetChildren(dbname, &filenames);
1529    if (!result.ok()) {
1530      // Ignore error in case directory does not exist
1531      return Status::OK();
1532    }
1533  
1534    FileLock* lock;
1535    const std::string lockname = LockFileName(dbname);
1536    result = env->LockFile(lockname, &lock);
1537    if (result.ok()) {
1538      uint64_t number;
1539      FileType type;
1540      for (size_t i = 0; i < filenames.size(); i++) {
1541        if (ParseFileName(filenames[i], &number, &type) &&
1542            type != kDBLockFile) {  // Lock file will be deleted at end
1543          Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1544          if (result.ok() && !del.ok()) {
1545            result = del;
1546          }
1547        }
1548      }
1549      env->UnlockFile(lock);  // Ignore error since state is already gone
1550      env->DeleteFile(lockname);
1551      env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1552    }
1553    return result;
1554  }
1555  
1556  }  // namespace leveldb