/ src / leveldb / db / db_impl.cc
db_impl.cc
   1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style license that can be
   3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
   4  
   5  #include "db/db_impl.h"
   6  
   7  #include <stdint.h>
   8  #include <stdio.h>
   9  
  10  #include <algorithm>
  11  #include <atomic>
  12  #include <set>
  13  #include <string>
  14  #include <vector>
  15  
  16  #include "db/builder.h"
  17  #include "db/db_iter.h"
  18  #include "db/dbformat.h"
  19  #include "db/filename.h"
  20  #include "db/log_reader.h"
  21  #include "db/log_writer.h"
  22  #include "db/memtable.h"
  23  #include "db/table_cache.h"
  24  #include "db/version_set.h"
  25  #include "db/write_batch_internal.h"
  26  #include "leveldb/db.h"
  27  #include "leveldb/env.h"
  28  #include "leveldb/status.h"
  29  #include "leveldb/table.h"
  30  #include "leveldb/table_builder.h"
  31  #include "port/port.h"
  32  #include "table/block.h"
  33  #include "table/merger.h"
  34  #include "table/two_level_iterator.h"
  35  #include "util/coding.h"
  36  #include "util/logging.h"
  37  #include "util/mutexlock.h"
  38  
  39  namespace leveldb {
  40  
  41  const int kNumNonTableCacheFiles = 10;
  42  
  43  // Information kept for every waiting writer
  44  struct DBImpl::Writer {
  45    explicit Writer(port::Mutex* mu)
  46        : batch(nullptr), sync(false), done(false), cv(mu) {}
  47  
  48    Status status;
  49    WriteBatch* batch;
  50    bool sync;
  51    bool done;
  52    port::CondVar cv;
  53  };
  54  
  55  struct DBImpl::CompactionState {
  56    // Files produced by compaction
  57    struct Output {
  58      uint64_t number;
  59      uint64_t file_size;
  60      InternalKey smallest, largest;
  61    };
  62  
  63    Output* current_output() { return &outputs[outputs.size() - 1]; }
  64  
  65    explicit CompactionState(Compaction* c)
  66        : compaction(c),
  67          smallest_snapshot(0),
  68          outfile(nullptr),
  69          builder(nullptr),
  70          total_bytes(0) {}
  71  
  72    Compaction* const compaction;
  73  
  74    // Sequence numbers < smallest_snapshot are not significant since we
  75    // will never have to service a snapshot below smallest_snapshot.
  76    // Therefore if we have seen a sequence number S <= smallest_snapshot,
  77    // we can drop all entries for the same key with sequence numbers < S.
  78    SequenceNumber smallest_snapshot;
  79  
  80    std::vector<Output> outputs;
  81  
  82    // State kept for output being generated
  83    WritableFile* outfile;
  84    TableBuilder* builder;
  85  
  86    uint64_t total_bytes;
  87  };
  88  
  89  // Fix user-supplied options to be reasonable
  90  template <class T, class V>
  91  static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  92    if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  93    if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  94  }
  95  Options SanitizeOptions(const std::string& dbname,
  96                          const InternalKeyComparator* icmp,
  97                          const InternalFilterPolicy* ipolicy,
  98                          const Options& src) {
  99    Options result = src;
 100    result.comparator = icmp;
 101    result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
 102    ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
 103    ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
 104    ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
 105    ClipToRange(&result.block_size, 1 << 10, 4 << 20);
 106    if (result.info_log == nullptr) {
 107      // Open a log file in the same directory as the db
 108      src.env->CreateDir(dbname);  // In case it does not exist
 109      src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
 110      Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
 111      if (!s.ok()) {
 112        // No place suitable for logging
 113        result.info_log = nullptr;
 114      }
 115    }
 116    if (result.block_cache == nullptr) {
 117      result.block_cache = NewLRUCache(8 << 20);
 118    }
 119    return result;
 120  }
 121  
 122  static int TableCacheSize(const Options& sanitized_options) {
 123    // Reserve ten files or so for other uses and give the rest to TableCache.
 124    return sanitized_options.max_open_files - kNumNonTableCacheFiles;
 125  }
 126  
 127  DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
 128      : env_(raw_options.env),
 129        internal_comparator_(raw_options.comparator),
 130        internal_filter_policy_(raw_options.filter_policy),
 131        options_(SanitizeOptions(dbname, &internal_comparator_,
 132                                 &internal_filter_policy_, raw_options)),
 133        owns_info_log_(options_.info_log != raw_options.info_log),
 134        owns_cache_(options_.block_cache != raw_options.block_cache),
 135        dbname_(dbname),
 136        table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
 137        db_lock_(nullptr),
 138        shutting_down_(false),
 139        background_work_finished_signal_(&mutex_),
 140        mem_(nullptr),
 141        imm_(nullptr),
 142        has_imm_(false),
 143        logfile_(nullptr),
 144        logfile_number_(0),
 145        log_(nullptr),
 146        seed_(0),
 147        tmp_batch_(new WriteBatch),
 148        background_compaction_scheduled_(false),
 149        manual_compaction_(nullptr),
 150        versions_(new VersionSet(dbname_, &options_, table_cache_,
 151                                 &internal_comparator_)) {}
 152  
 153  DBImpl::~DBImpl() {
 154    // Wait for background work to finish.
 155    mutex_.Lock();
 156    shutting_down_.store(true, std::memory_order_release);
 157    while (background_compaction_scheduled_) {
 158      background_work_finished_signal_.Wait();
 159    }
 160    mutex_.Unlock();
 161  
 162    if (db_lock_ != nullptr) {
 163      env_->UnlockFile(db_lock_);
 164    }
 165  
 166    delete versions_;
 167    if (mem_ != nullptr) mem_->Unref();
 168    if (imm_ != nullptr) imm_->Unref();
 169    delete tmp_batch_;
 170    delete log_;
 171    delete logfile_;
 172    delete table_cache_;
 173  
 174    if (owns_info_log_) {
 175      delete options_.info_log;
 176    }
 177    if (owns_cache_) {
 178      delete options_.block_cache;
 179    }
 180  }
 181  
 182  Status DBImpl::NewDB() {
 183    VersionEdit new_db;
 184    new_db.SetComparatorName(user_comparator()->Name());
 185    new_db.SetLogNumber(0);
 186    new_db.SetNextFile(2);
 187    new_db.SetLastSequence(0);
 188  
 189    const std::string manifest = DescriptorFileName(dbname_, 1);
 190    WritableFile* file;
 191    Status s = env_->NewWritableFile(manifest, &file);
 192    if (!s.ok()) {
 193      return s;
 194    }
 195    {
 196      log::Writer log(file);
 197      std::string record;
 198      new_db.EncodeTo(&record);
 199      s = log.AddRecord(record);
 200      if (s.ok()) {
 201        s = file->Close();
 202      }
 203    }
 204    delete file;
 205    if (s.ok()) {
 206      // Make "CURRENT" file that points to the new manifest file.
 207      s = SetCurrentFile(env_, dbname_, 1);
 208    } else {
 209      env_->DeleteFile(manifest);
 210    }
 211    return s;
 212  }
 213  
 214  void DBImpl::MaybeIgnoreError(Status* s) const {
 215    if (s->ok() || options_.paranoid_checks) {
 216      // No change needed
 217    } else {
 218      Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
 219      *s = Status::OK();
 220    }
 221  }
 222  
 223  void DBImpl::DeleteObsoleteFiles() {
 224    mutex_.AssertHeld();
 225  
 226    if (!bg_error_.ok()) {
 227      // After a background error, we don't know whether a new version may
 228      // or may not have been committed, so we cannot safely garbage collect.
 229      return;
 230    }
 231  
 232    // Make a set of all of the live files
 233    std::set<uint64_t> live = pending_outputs_;
 234    versions_->AddLiveFiles(&live);
 235  
 236    std::vector<std::string> filenames;
 237    env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
 238    uint64_t number;
 239    FileType type;
 240    std::vector<std::string> files_to_delete;
 241    for (std::string& filename : filenames) {
 242      if (ParseFileName(filename, &number, &type)) {
 243        bool keep = true;
 244        switch (type) {
 245          case kLogFile:
 246            keep = ((number >= versions_->LogNumber()) ||
 247                    (number == versions_->PrevLogNumber()));
 248            break;
 249          case kDescriptorFile:
 250            // Keep my manifest file, and any newer incarnations'
 251            // (in case there is a race that allows other incarnations)
 252            keep = (number >= versions_->ManifestFileNumber());
 253            break;
 254          case kTableFile:
 255            keep = (live.find(number) != live.end());
 256            break;
 257          case kTempFile:
 258            // Any temp files that are currently being written to must
 259            // be recorded in pending_outputs_, which is inserted into "live"
 260            keep = (live.find(number) != live.end());
 261            break;
 262          case kCurrentFile:
 263          case kDBLockFile:
 264          case kInfoLogFile:
 265            keep = true;
 266            break;
 267        }
 268  
 269        if (!keep) {
 270          files_to_delete.push_back(std::move(filename));
 271          if (type == kTableFile) {
 272            table_cache_->Evict(number);
 273          }
 274          Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
 275              static_cast<unsigned long long>(number));
 276        }
 277      }
 278    }
 279  
 280    // While deleting all files unblock other threads. All files being deleted
 281    // have unique names which will not collide with newly created files and
 282    // are therefore safe to delete while allowing other threads to proceed.
 283    mutex_.Unlock();
 284    for (const std::string& filename : files_to_delete) {
 285      env_->DeleteFile(dbname_ + "/" + filename);
 286    }
 287    mutex_.Lock();
 288  }
 289  
 290  Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
 291    mutex_.AssertHeld();
 292  
 293    // Ignore error from CreateDir since the creation of the DB is
 294    // committed only when the descriptor is created, and this directory
 295    // may already exist from a previous failed creation attempt.
 296    env_->CreateDir(dbname_);
 297    assert(db_lock_ == nullptr);
 298    Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
 299    if (!s.ok()) {
 300      return s;
 301    }
 302  
 303    if (!env_->FileExists(CurrentFileName(dbname_))) {
 304      if (options_.create_if_missing) {
 305        s = NewDB();
 306        if (!s.ok()) {
 307          return s;
 308        }
 309      } else {
 310        return Status::InvalidArgument(
 311            dbname_, "does not exist (create_if_missing is false)");
 312      }
 313    } else {
 314      if (options_.error_if_exists) {
 315        return Status::InvalidArgument(dbname_,
 316                                       "exists (error_if_exists is true)");
 317      }
 318    }
 319  
 320    s = versions_->Recover(save_manifest);
 321    if (!s.ok()) {
 322      return s;
 323    }
 324    SequenceNumber max_sequence(0);
 325  
 326    // Recover from all newer log files than the ones named in the
 327    // descriptor (new log files may have been added by the previous
 328    // incarnation without registering them in the descriptor).
 329    //
 330    // Note that PrevLogNumber() is no longer used, but we pay
 331    // attention to it in case we are recovering a database
 332    // produced by an older version of leveldb.
 333    const uint64_t min_log = versions_->LogNumber();
 334    const uint64_t prev_log = versions_->PrevLogNumber();
 335    std::vector<std::string> filenames;
 336    s = env_->GetChildren(dbname_, &filenames);
 337    if (!s.ok()) {
 338      return s;
 339    }
 340    std::set<uint64_t> expected;
 341    versions_->AddLiveFiles(&expected);
 342    uint64_t number;
 343    FileType type;
 344    std::vector<uint64_t> logs;
 345    for (size_t i = 0; i < filenames.size(); i++) {
 346      if (ParseFileName(filenames[i], &number, &type)) {
 347        expected.erase(number);
 348        if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
 349          logs.push_back(number);
 350      }
 351    }
 352    if (!expected.empty()) {
 353      char buf[50];
 354      snprintf(buf, sizeof(buf), "%d missing files; e.g.",
 355               static_cast<int>(expected.size()));
 356      return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
 357    }
 358  
 359    // Recover in the order in which the logs were generated
 360    std::sort(logs.begin(), logs.end());
 361    for (size_t i = 0; i < logs.size(); i++) {
 362      s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
 363                         &max_sequence);
 364      if (!s.ok()) {
 365        return s;
 366      }
 367  
 368      // The previous incarnation may not have written any MANIFEST
 369      // records after allocating this log number.  So we manually
 370      // update the file number allocation counter in VersionSet.
 371      versions_->MarkFileNumberUsed(logs[i]);
 372    }
 373  
 374    if (versions_->LastSequence() < max_sequence) {
 375      versions_->SetLastSequence(max_sequence);
 376    }
 377  
 378    return Status::OK();
 379  }
 380  
 381  Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
 382                                bool* save_manifest, VersionEdit* edit,
 383                                SequenceNumber* max_sequence) {
 384    struct LogReporter : public log::Reader::Reporter {
 385      Env* env;
 386      Logger* info_log;
 387      const char* fname;
 388      Status* status;  // null if options_.paranoid_checks==false
 389      void Corruption(size_t bytes, const Status& s) override {
 390        Log(info_log, "%s%s: dropping %d bytes; %s",
 391            (this->status == nullptr ? "(ignoring error) " : ""), fname,
 392            static_cast<int>(bytes), s.ToString().c_str());
 393        if (this->status != nullptr && this->status->ok()) *this->status = s;
 394      }
 395    };
 396  
 397    mutex_.AssertHeld();
 398  
 399    // Open the log file
 400    std::string fname = LogFileName(dbname_, log_number);
 401    SequentialFile* file;
 402    Status status = env_->NewSequentialFile(fname, &file);
 403    if (!status.ok()) {
 404      MaybeIgnoreError(&status);
 405      return status;
 406    }
 407  
 408    // Create the log reader.
 409    LogReporter reporter;
 410    reporter.env = env_;
 411    reporter.info_log = options_.info_log;
 412    reporter.fname = fname.c_str();
 413    reporter.status = (options_.paranoid_checks ? &status : nullptr);
 414    // We intentionally make log::Reader do checksumming even if
 415    // paranoid_checks==false so that corruptions cause entire commits
 416    // to be skipped instead of propagating bad information (like overly
 417    // large sequence numbers).
 418    log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
 419    Log(options_.info_log, "Recovering log #%llu",
 420        (unsigned long long)log_number);
 421  
 422    // Read all the records and add to a memtable
 423    std::string scratch;
 424    Slice record;
 425    WriteBatch batch;
 426    int compactions = 0;
 427    MemTable* mem = nullptr;
 428    while (reader.ReadRecord(&record, &scratch) && status.ok()) {
 429      if (record.size() < 12) {
 430        reporter.Corruption(record.size(),
 431                            Status::Corruption("log record too small", fname));
 432        continue;
 433      }
 434      WriteBatchInternal::SetContents(&batch, record);
 435  
 436      if (mem == nullptr) {
 437        mem = new MemTable(internal_comparator_);
 438        mem->Ref();
 439      }
 440      status = WriteBatchInternal::InsertInto(&batch, mem);
 441      MaybeIgnoreError(&status);
 442      if (!status.ok()) {
 443        break;
 444      }
 445      const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
 446                                      WriteBatchInternal::Count(&batch) - 1;
 447      if (last_seq > *max_sequence) {
 448        *max_sequence = last_seq;
 449      }
 450  
 451      if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
 452        compactions++;
 453        *save_manifest = true;
 454        status = WriteLevel0Table(mem, edit, nullptr);
 455        mem->Unref();
 456        mem = nullptr;
 457        if (!status.ok()) {
 458          // Reflect errors immediately so that conditions like full
 459          // file-systems cause the DB::Open() to fail.
 460          break;
 461        }
 462      }
 463    }
 464  
 465    delete file;
 466  
 467    // See if we should keep reusing the last log file.
 468    if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
 469      assert(logfile_ == nullptr);
 470      assert(log_ == nullptr);
 471      assert(mem_ == nullptr);
 472      uint64_t lfile_size;
 473      if (env_->GetFileSize(fname, &lfile_size).ok() &&
 474          env_->NewAppendableFile(fname, &logfile_).ok()) {
 475        Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
 476        log_ = new log::Writer(logfile_, lfile_size);
 477        logfile_number_ = log_number;
 478        if (mem != nullptr) {
 479          mem_ = mem;
 480          mem = nullptr;
 481        } else {
 482          // mem can be nullptr if lognum exists but was empty.
 483          mem_ = new MemTable(internal_comparator_);
 484          mem_->Ref();
 485        }
 486      }
 487    }
 488  
 489    if (mem != nullptr) {
 490      // mem did not get reused; compact it.
 491      if (status.ok()) {
 492        *save_manifest = true;
 493        status = WriteLevel0Table(mem, edit, nullptr);
 494      }
 495      mem->Unref();
 496    }
 497  
 498    return status;
 499  }
 500  
 501  Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
 502                                  Version* base) {
 503    mutex_.AssertHeld();
 504    const uint64_t start_micros = env_->NowMicros();
 505    FileMetaData meta;
 506    meta.number = versions_->NewFileNumber();
 507    pending_outputs_.insert(meta.number);
 508    Iterator* iter = mem->NewIterator();
 509    Log(options_.info_log, "Level-0 table #%llu: started",
 510        (unsigned long long)meta.number);
 511  
 512    Status s;
 513    {
 514      mutex_.Unlock();
 515      s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
 516      mutex_.Lock();
 517    }
 518  
 519    Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
 520        (unsigned long long)meta.number, (unsigned long long)meta.file_size,
 521        s.ToString().c_str());
 522    delete iter;
 523    pending_outputs_.erase(meta.number);
 524  
 525    // Note that if file_size is zero, the file has been deleted and
 526    // should not be added to the manifest.
 527    int level = 0;
 528    if (s.ok() && meta.file_size > 0) {
 529      const Slice min_user_key = meta.smallest.user_key();
 530      const Slice max_user_key = meta.largest.user_key();
 531      if (base != nullptr) {
 532        level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
 533      }
 534      edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
 535                    meta.largest);
 536    }
 537  
 538    CompactionStats stats;
 539    stats.micros = env_->NowMicros() - start_micros;
 540    stats.bytes_written = meta.file_size;
 541    stats_[level].Add(stats);
 542    return s;
 543  }
 544  
 545  void DBImpl::CompactMemTable() {
 546    mutex_.AssertHeld();
 547    assert(imm_ != nullptr);
 548  
 549    // Save the contents of the memtable as a new Table
 550    VersionEdit edit;
 551    Version* base = versions_->current();
 552    base->Ref();
 553    Status s = WriteLevel0Table(imm_, &edit, base);
 554    base->Unref();
 555  
 556    if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
 557      s = Status::IOError("Deleting DB during memtable compaction");
 558    }
 559  
 560    // Replace immutable memtable with the generated Table
 561    if (s.ok()) {
 562      edit.SetPrevLogNumber(0);
 563      edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
 564      s = versions_->LogAndApply(&edit, &mutex_);
 565    }
 566  
 567    if (s.ok()) {
 568      // Commit to the new state
 569      imm_->Unref();
 570      imm_ = nullptr;
 571      has_imm_.store(false, std::memory_order_release);
 572      DeleteObsoleteFiles();
 573    } else {
 574      RecordBackgroundError(s);
 575    }
 576  }
 577  
 578  void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
 579    int max_level_with_files = 1;
 580    {
 581      MutexLock l(&mutex_);
 582      Version* base = versions_->current();
 583      for (int level = 1; level < config::kNumLevels; level++) {
 584        if (base->OverlapInLevel(level, begin, end)) {
 585          max_level_with_files = level;
 586        }
 587      }
 588    }
 589    TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
 590    for (int level = 0; level < max_level_with_files; level++) {
 591      TEST_CompactRange(level, begin, end);
 592    }
 593  }
 594  
 595  void DBImpl::TEST_CompactRange(int level, const Slice* begin,
 596                                 const Slice* end) {
 597    assert(level >= 0);
 598    assert(level + 1 < config::kNumLevels);
 599  
 600    InternalKey begin_storage, end_storage;
 601  
 602    ManualCompaction manual;
 603    manual.level = level;
 604    manual.done = false;
 605    if (begin == nullptr) {
 606      manual.begin = nullptr;
 607    } else {
 608      begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
 609      manual.begin = &begin_storage;
 610    }
 611    if (end == nullptr) {
 612      manual.end = nullptr;
 613    } else {
 614      end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
 615      manual.end = &end_storage;
 616    }
 617  
 618    MutexLock l(&mutex_);
 619    while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
 620           bg_error_.ok()) {
 621      if (manual_compaction_ == nullptr) {  // Idle
 622        manual_compaction_ = &manual;
 623        MaybeScheduleCompaction();
 624      } else {  // Running either my compaction or another compaction.
 625        background_work_finished_signal_.Wait();
 626      }
 627    }
 628    if (manual_compaction_ == &manual) {
 629      // Cancel my manual compaction since we aborted early for some reason.
 630      manual_compaction_ = nullptr;
 631    }
 632  }
 633  
 634  Status DBImpl::TEST_CompactMemTable() {
 635    // nullptr batch means just wait for earlier writes to be done
 636    Status s = Write(WriteOptions(), nullptr);
 637    if (s.ok()) {
 638      // Wait until the compaction completes
 639      MutexLock l(&mutex_);
 640      while (imm_ != nullptr && bg_error_.ok()) {
 641        background_work_finished_signal_.Wait();
 642      }
 643      if (imm_ != nullptr) {
 644        s = bg_error_;
 645      }
 646    }
 647    return s;
 648  }
 649  
 650  void DBImpl::RecordBackgroundError(const Status& s) {
 651    mutex_.AssertHeld();
 652    if (bg_error_.ok()) {
 653      bg_error_ = s;
 654      background_work_finished_signal_.SignalAll();
 655    }
 656  }
 657  
 658  void DBImpl::MaybeScheduleCompaction() {
 659    mutex_.AssertHeld();
 660    if (background_compaction_scheduled_) {
 661      // Already scheduled
 662    } else if (shutting_down_.load(std::memory_order_acquire)) {
 663      // DB is being deleted; no more background compactions
 664    } else if (!bg_error_.ok()) {
 665      // Already got an error; no more changes
 666    } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
 667               !versions_->NeedsCompaction()) {
 668      // No work to be done
 669    } else {
 670      background_compaction_scheduled_ = true;
 671      env_->Schedule(&DBImpl::BGWork, this);
 672    }
 673  }
 674  
 675  void DBImpl::BGWork(void* db) {
 676    reinterpret_cast<DBImpl*>(db)->BackgroundCall();
 677  }
 678  
 679  void DBImpl::BackgroundCall() {
 680    MutexLock l(&mutex_);
 681    assert(background_compaction_scheduled_);
 682    if (shutting_down_.load(std::memory_order_acquire)) {
 683      // No more background work when shutting down.
 684    } else if (!bg_error_.ok()) {
 685      // No more background work after a background error.
 686    } else {
 687      BackgroundCompaction();
 688    }
 689  
 690    background_compaction_scheduled_ = false;
 691  
 692    // Previous compaction may have produced too many files in a level,
 693    // so reschedule another compaction if needed.
 694    MaybeScheduleCompaction();
 695    background_work_finished_signal_.SignalAll();
 696  }
 697  
 698  void DBImpl::BackgroundCompaction() {
 699    mutex_.AssertHeld();
 700  
 701    if (imm_ != nullptr) {
 702      CompactMemTable();
 703      return;
 704    }
 705  
 706    Compaction* c;
 707    bool is_manual = (manual_compaction_ != nullptr);
 708    InternalKey manual_end;
 709    if (is_manual) {
 710      ManualCompaction* m = manual_compaction_;
 711      c = versions_->CompactRange(m->level, m->begin, m->end);
 712      m->done = (c == nullptr);
 713      if (c != nullptr) {
 714        manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
 715      }
 716      Log(options_.info_log,
 717          "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
 718          m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
 719          (m->end ? m->end->DebugString().c_str() : "(end)"),
 720          (m->done ? "(end)" : manual_end.DebugString().c_str()));
 721    } else {
 722      c = versions_->PickCompaction();
 723    }
 724  
 725    Status status;
 726    if (c == nullptr) {
 727      // Nothing to do
 728    } else if (!is_manual && c->IsTrivialMove()) {
 729      // Move file to next level
 730      assert(c->num_input_files(0) == 1);
 731      FileMetaData* f = c->input(0, 0);
 732      c->edit()->DeleteFile(c->level(), f->number);
 733      c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
 734                         f->largest);
 735      status = versions_->LogAndApply(c->edit(), &mutex_);
 736      if (!status.ok()) {
 737        RecordBackgroundError(status);
 738      }
 739      VersionSet::LevelSummaryStorage tmp;
 740      Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
 741          static_cast<unsigned long long>(f->number), c->level() + 1,
 742          static_cast<unsigned long long>(f->file_size),
 743          status.ToString().c_str(), versions_->LevelSummary(&tmp));
 744    } else {
 745      CompactionState* compact = new CompactionState(c);
 746      status = DoCompactionWork(compact);
 747      if (!status.ok()) {
 748        RecordBackgroundError(status);
 749      }
 750      CleanupCompaction(compact);
 751      c->ReleaseInputs();
 752      DeleteObsoleteFiles();
 753    }
 754    delete c;
 755  
 756    if (status.ok()) {
 757      // Done
 758    } else if (shutting_down_.load(std::memory_order_acquire)) {
 759      // Ignore compaction errors found during shutting down
 760    } else {
 761      Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
 762    }
 763  
 764    if (is_manual) {
 765      ManualCompaction* m = manual_compaction_;
 766      if (!status.ok()) {
 767        m->done = true;
 768      }
 769      if (!m->done) {
 770        // We only compacted part of the requested range.  Update *m
 771        // to the range that is left to be compacted.
 772        m->tmp_storage = manual_end;
 773        m->begin = &m->tmp_storage;
 774      }
 775      manual_compaction_ = nullptr;
 776    }
 777  }
 778  
 779  void DBImpl::CleanupCompaction(CompactionState* compact) {
 780    mutex_.AssertHeld();
 781    if (compact->builder != nullptr) {
 782      // May happen if we get a shutdown call in the middle of compaction
 783      compact->builder->Abandon();
 784      delete compact->builder;
 785    } else {
 786      assert(compact->outfile == nullptr);
 787    }
 788    delete compact->outfile;
 789    for (size_t i = 0; i < compact->outputs.size(); i++) {
 790      const CompactionState::Output& out = compact->outputs[i];
 791      pending_outputs_.erase(out.number);
 792    }
 793    delete compact;
 794  }
 795  
 796  Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
 797    assert(compact != nullptr);
 798    assert(compact->builder == nullptr);
 799    uint64_t file_number;
 800    {
 801      mutex_.Lock();
 802      file_number = versions_->NewFileNumber();
 803      pending_outputs_.insert(file_number);
 804      CompactionState::Output out;
 805      out.number = file_number;
 806      out.smallest.Clear();
 807      out.largest.Clear();
 808      compact->outputs.push_back(out);
 809      mutex_.Unlock();
 810    }
 811  
 812    // Make the output file
 813    std::string fname = TableFileName(dbname_, file_number);
 814    Status s = env_->NewWritableFile(fname, &compact->outfile);
 815    if (s.ok()) {
 816      compact->builder = new TableBuilder(options_, compact->outfile);
 817    }
 818    return s;
 819  }
 820  
 821  Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
 822                                            Iterator* input) {
 823    assert(compact != nullptr);
 824    assert(compact->outfile != nullptr);
 825    assert(compact->builder != nullptr);
 826  
 827    const uint64_t output_number = compact->current_output()->number;
 828    assert(output_number != 0);
 829  
 830    // Check for iterator errors
 831    Status s = input->status();
 832    const uint64_t current_entries = compact->builder->NumEntries();
 833    if (s.ok()) {
 834      s = compact->builder->Finish();
 835    } else {
 836      compact->builder->Abandon();
 837    }
 838    const uint64_t current_bytes = compact->builder->FileSize();
 839    compact->current_output()->file_size = current_bytes;
 840    compact->total_bytes += current_bytes;
 841    delete compact->builder;
 842    compact->builder = nullptr;
 843  
 844    // Finish and check for file errors
 845    if (s.ok()) {
 846      s = compact->outfile->Sync();
 847    }
 848    if (s.ok()) {
 849      s = compact->outfile->Close();
 850    }
 851    delete compact->outfile;
 852    compact->outfile = nullptr;
 853  
 854    if (s.ok() && current_entries > 0) {
 855      // Verify that the table is usable
 856      Iterator* iter =
 857          table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
 858      s = iter->status();
 859      delete iter;
 860      if (s.ok()) {
 861        Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
 862            (unsigned long long)output_number, compact->compaction->level(),
 863            (unsigned long long)current_entries,
 864            (unsigned long long)current_bytes);
 865      }
 866    }
 867    return s;
 868  }
 869  
 870  Status DBImpl::InstallCompactionResults(CompactionState* compact) {
 871    mutex_.AssertHeld();
 872    Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
 873        compact->compaction->num_input_files(0), compact->compaction->level(),
 874        compact->compaction->num_input_files(1), compact->compaction->level() + 1,
 875        static_cast<long long>(compact->total_bytes));
 876  
 877    // Add compaction outputs
 878    compact->compaction->AddInputDeletions(compact->compaction->edit());
 879    const int level = compact->compaction->level();
 880    for (size_t i = 0; i < compact->outputs.size(); i++) {
 881      const CompactionState::Output& out = compact->outputs[i];
 882      compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
 883                                           out.smallest, out.largest);
 884    }
 885    return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
 886  }
 887  
 888  Status DBImpl::DoCompactionWork(CompactionState* compact) {
 889    const uint64_t start_micros = env_->NowMicros();
 890    int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
 891  
 892    Log(options_.info_log, "Compacting %d@%d + %d@%d files",
 893        compact->compaction->num_input_files(0), compact->compaction->level(),
 894        compact->compaction->num_input_files(1),
 895        compact->compaction->level() + 1);
 896  
 897    assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
 898    assert(compact->builder == nullptr);
 899    assert(compact->outfile == nullptr);
 900    if (snapshots_.empty()) {
 901      compact->smallest_snapshot = versions_->LastSequence();
 902    } else {
 903      compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
 904    }
 905  
 906    Iterator* input = versions_->MakeInputIterator(compact->compaction);
 907  
 908    // Release mutex while we're actually doing the compaction work
 909    mutex_.Unlock();
 910  
 911    input->SeekToFirst();
 912    Status status;
 913    ParsedInternalKey ikey;
 914    std::string current_user_key;
 915    bool has_current_user_key = false;
 916    SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
 917    while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
 918      // Prioritize immutable compaction work
 919      if (has_imm_.load(std::memory_order_relaxed)) {
 920        const uint64_t imm_start = env_->NowMicros();
 921        mutex_.Lock();
 922        if (imm_ != nullptr) {
 923          CompactMemTable();
 924          // Wake up MakeRoomForWrite() if necessary.
 925          background_work_finished_signal_.SignalAll();
 926        }
 927        mutex_.Unlock();
 928        imm_micros += (env_->NowMicros() - imm_start);
 929      }
 930  
 931      Slice key = input->key();
 932      if (compact->compaction->ShouldStopBefore(key) &&
 933          compact->builder != nullptr) {
 934        status = FinishCompactionOutputFile(compact, input);
 935        if (!status.ok()) {
 936          break;
 937        }
 938      }
 939  
 940      // Handle key/value, add to state, etc.
 941      bool drop = false;
 942      if (!ParseInternalKey(key, &ikey)) {
 943        // Do not hide error keys
 944        current_user_key.clear();
 945        has_current_user_key = false;
 946        last_sequence_for_key = kMaxSequenceNumber;
 947      } else {
 948        if (!has_current_user_key ||
 949            user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
 950                0) {
 951          // First occurrence of this user key
 952          current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
 953          has_current_user_key = true;
 954          last_sequence_for_key = kMaxSequenceNumber;
 955        }
 956  
 957        if (last_sequence_for_key <= compact->smallest_snapshot) {
 958          // Hidden by an newer entry for same user key
 959          drop = true;  // (A)
 960        } else if (ikey.type == kTypeDeletion &&
 961                   ikey.sequence <= compact->smallest_snapshot &&
 962                   compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
 963          // For this user key:
 964          // (1) there is no data in higher levels
 965          // (2) data in lower levels will have larger sequence numbers
 966          // (3) data in layers that are being compacted here and have
 967          //     smaller sequence numbers will be dropped in the next
 968          //     few iterations of this loop (by rule (A) above).
 969          // Therefore this deletion marker is obsolete and can be dropped.
 970          drop = true;
 971        }
 972  
 973        last_sequence_for_key = ikey.sequence;
 974      }
 975  #if 0
 976      Log(options_.info_log,
 977          "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
 978          "%d smallest_snapshot: %d",
 979          ikey.user_key.ToString().c_str(),
 980          (int)ikey.sequence, ikey.type, kTypeValue, drop,
 981          compact->compaction->IsBaseLevelForKey(ikey.user_key),
 982          (int)last_sequence_for_key, (int)compact->smallest_snapshot);
 983  #endif
 984  
 985      if (!drop) {
 986        // Open output file if necessary
 987        if (compact->builder == nullptr) {
 988          status = OpenCompactionOutputFile(compact);
 989          if (!status.ok()) {
 990            break;
 991          }
 992        }
 993        if (compact->builder->NumEntries() == 0) {
 994          compact->current_output()->smallest.DecodeFrom(key);
 995        }
 996        compact->current_output()->largest.DecodeFrom(key);
 997        compact->builder->Add(key, input->value());
 998  
 999        // Close output file if it is big enough
1000        if (compact->builder->FileSize() >=
1001            compact->compaction->MaxOutputFileSize()) {
1002          status = FinishCompactionOutputFile(compact, input);
1003          if (!status.ok()) {
1004            break;
1005          }
1006        }
1007      }
1008  
1009      input->Next();
1010    }
1011  
1012    if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
1013      status = Status::IOError("Deleting DB during compaction");
1014    }
1015    if (status.ok() && compact->builder != nullptr) {
1016      status = FinishCompactionOutputFile(compact, input);
1017    }
1018    if (status.ok()) {
1019      status = input->status();
1020    }
1021    delete input;
1022    input = nullptr;
1023  
1024    CompactionStats stats;
1025    stats.micros = env_->NowMicros() - start_micros - imm_micros;
1026    for (int which = 0; which < 2; which++) {
1027      for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1028        stats.bytes_read += compact->compaction->input(which, i)->file_size;
1029      }
1030    }
1031    for (size_t i = 0; i < compact->outputs.size(); i++) {
1032      stats.bytes_written += compact->outputs[i].file_size;
1033    }
1034  
1035    mutex_.Lock();
1036    stats_[compact->compaction->level() + 1].Add(stats);
1037  
1038    if (status.ok()) {
1039      status = InstallCompactionResults(compact);
1040    }
1041    if (!status.ok()) {
1042      RecordBackgroundError(status);
1043    }
1044    VersionSet::LevelSummaryStorage tmp;
1045    Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1046    return status;
1047  }
1048  
1049  namespace {
1050  
1051  struct IterState {
1052    port::Mutex* const mu;
1053    Version* const version GUARDED_BY(mu);
1054    MemTable* const mem GUARDED_BY(mu);
1055    MemTable* const imm GUARDED_BY(mu);
1056  
1057    IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
1058        : mu(mutex), version(version), mem(mem), imm(imm) {}
1059  };
1060  
1061  static void CleanupIteratorState(void* arg1, void* arg2) {
1062    IterState* state = reinterpret_cast<IterState*>(arg1);
1063    state->mu->Lock();
1064    state->mem->Unref();
1065    if (state->imm != nullptr) state->imm->Unref();
1066    state->version->Unref();
1067    state->mu->Unlock();
1068    delete state;
1069  }
1070  
1071  }  // anonymous namespace
1072  
1073  Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1074                                        SequenceNumber* latest_snapshot,
1075                                        uint32_t* seed) {
1076    mutex_.Lock();
1077    *latest_snapshot = versions_->LastSequence();
1078  
1079    // Collect together all needed child iterators
1080    std::vector<Iterator*> list;
1081    list.push_back(mem_->NewIterator());
1082    mem_->Ref();
1083    if (imm_ != nullptr) {
1084      list.push_back(imm_->NewIterator());
1085      imm_->Ref();
1086    }
1087    versions_->current()->AddIterators(options, &list);
1088    Iterator* internal_iter =
1089        NewMergingIterator(&internal_comparator_, &list[0], list.size());
1090    versions_->current()->Ref();
1091  
1092    IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1093    internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1094  
1095    *seed = ++seed_;
1096    mutex_.Unlock();
1097    return internal_iter;
1098  }
1099  
1100  Iterator* DBImpl::TEST_NewInternalIterator() {
1101    SequenceNumber ignored;
1102    uint32_t ignored_seed;
1103    return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1104  }
1105  
1106  int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1107    MutexLock l(&mutex_);
1108    return versions_->MaxNextLevelOverlappingBytes();
1109  }
1110  
1111  Status DBImpl::Get(const ReadOptions& options, const Slice& key,
1112                     std::string* value) {
1113    Status s;
1114    MutexLock l(&mutex_);
1115    SequenceNumber snapshot;
1116    if (options.snapshot != nullptr) {
1117      snapshot =
1118          static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1119    } else {
1120      snapshot = versions_->LastSequence();
1121    }
1122  
1123    MemTable* mem = mem_;
1124    MemTable* imm = imm_;
1125    Version* current = versions_->current();
1126    mem->Ref();
1127    if (imm != nullptr) imm->Ref();
1128    current->Ref();
1129  
1130    bool have_stat_update = false;
1131    Version::GetStats stats;
1132  
1133    // Unlock while reading from files and memtables
1134    {
1135      mutex_.Unlock();
1136      // First look in the memtable, then in the immutable memtable (if any).
1137      LookupKey lkey(key, snapshot);
1138      if (mem->Get(lkey, value, &s)) {
1139        // Done
1140      } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
1141        // Done
1142      } else {
1143        s = current->Get(options, lkey, value, &stats);
1144        have_stat_update = true;
1145      }
1146      mutex_.Lock();
1147    }
1148  
1149    if (have_stat_update && current->UpdateStats(stats)) {
1150      MaybeScheduleCompaction();
1151    }
1152    mem->Unref();
1153    if (imm != nullptr) imm->Unref();
1154    current->Unref();
1155    return s;
1156  }
1157  
1158  Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1159    SequenceNumber latest_snapshot;
1160    uint32_t seed;
1161    Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1162    return NewDBIterator(this, user_comparator(), iter,
1163                         (options.snapshot != nullptr
1164                              ? static_cast<const SnapshotImpl*>(options.snapshot)
1165                                    ->sequence_number()
1166                              : latest_snapshot),
1167                         seed);
1168  }
1169  
1170  void DBImpl::RecordReadSample(Slice key) {
1171    MutexLock l(&mutex_);
1172    if (versions_->current()->RecordReadSample(key)) {
1173      MaybeScheduleCompaction();
1174    }
1175  }
1176  
1177  const Snapshot* DBImpl::GetSnapshot() {
1178    MutexLock l(&mutex_);
1179    return snapshots_.New(versions_->LastSequence());
1180  }
1181  
1182  void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
1183    MutexLock l(&mutex_);
1184    snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1185  }
1186  
1187  // Convenience methods
1188  Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1189    return DB::Put(o, key, val);
1190  }
1191  
1192  Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1193    return DB::Delete(options, key);
1194  }
1195  
1196  Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1197    Writer w(&mutex_);
1198    w.batch = updates;
1199    w.sync = options.sync;
1200    w.done = false;
1201  
1202    MutexLock l(&mutex_);
1203    writers_.push_back(&w);
1204    while (!w.done && &w != writers_.front()) {
1205      w.cv.Wait();
1206    }
1207    if (w.done) {
1208      return w.status;
1209    }
1210  
1211    // May temporarily unlock and wait.
1212    Status status = MakeRoomForWrite(updates == nullptr);
1213    uint64_t last_sequence = versions_->LastSequence();
1214    Writer* last_writer = &w;
1215    if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
1216      WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1217      WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1218      last_sequence += WriteBatchInternal::Count(write_batch);
1219  
1220      // Add to log and apply to memtable.  We can release the lock
1221      // during this phase since &w is currently responsible for logging
1222      // and protects against concurrent loggers and concurrent writes
1223      // into mem_.
1224      {
1225        mutex_.Unlock();
1226        status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
1227        bool sync_error = false;
1228        if (status.ok() && options.sync) {
1229          status = logfile_->Sync();
1230          if (!status.ok()) {
1231            sync_error = true;
1232          }
1233        }
1234        if (status.ok()) {
1235          status = WriteBatchInternal::InsertInto(write_batch, mem_);
1236        }
1237        mutex_.Lock();
1238        if (sync_error) {
1239          // The state of the log file is indeterminate: the log record we
1240          // just added may or may not show up when the DB is re-opened.
1241          // So we force the DB into a mode where all future writes fail.
1242          RecordBackgroundError(status);
1243        }
1244      }
1245      if (write_batch == tmp_batch_) tmp_batch_->Clear();
1246  
1247      versions_->SetLastSequence(last_sequence);
1248    }
1249  
1250    while (true) {
1251      Writer* ready = writers_.front();
1252      writers_.pop_front();
1253      if (ready != &w) {
1254        ready->status = status;
1255        ready->done = true;
1256        ready->cv.Signal();
1257      }
1258      if (ready == last_writer) break;
1259    }
1260  
1261    // Notify new head of write queue
1262    if (!writers_.empty()) {
1263      writers_.front()->cv.Signal();
1264    }
1265  
1266    return status;
1267  }
1268  
1269  // REQUIRES: Writer list must be non-empty
1270  // REQUIRES: First writer must have a non-null batch
1271  WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1272    mutex_.AssertHeld();
1273    assert(!writers_.empty());
1274    Writer* first = writers_.front();
1275    WriteBatch* result = first->batch;
1276    assert(result != nullptr);
1277  
1278    size_t size = WriteBatchInternal::ByteSize(first->batch);
1279  
1280    // Allow the group to grow up to a maximum size, but if the
1281    // original write is small, limit the growth so we do not slow
1282    // down the small write too much.
1283    size_t max_size = 1 << 20;
1284    if (size <= (128 << 10)) {
1285      max_size = size + (128 << 10);
1286    }
1287  
1288    *last_writer = first;
1289    std::deque<Writer*>::iterator iter = writers_.begin();
1290    ++iter;  // Advance past "first"
1291    for (; iter != writers_.end(); ++iter) {
1292      Writer* w = *iter;
1293      if (w->sync && !first->sync) {
1294        // Do not include a sync write into a batch handled by a non-sync write.
1295        break;
1296      }
1297  
1298      if (w->batch != nullptr) {
1299        size += WriteBatchInternal::ByteSize(w->batch);
1300        if (size > max_size) {
1301          // Do not make batch too big
1302          break;
1303        }
1304  
1305        // Append to *result
1306        if (result == first->batch) {
1307          // Switch to temporary batch instead of disturbing caller's batch
1308          result = tmp_batch_;
1309          assert(WriteBatchInternal::Count(result) == 0);
1310          WriteBatchInternal::Append(result, first->batch);
1311        }
1312        WriteBatchInternal::Append(result, w->batch);
1313      }
1314      *last_writer = w;
1315    }
1316    return result;
1317  }
1318  
1319  // REQUIRES: mutex_ is held
1320  // REQUIRES: this thread is currently at the front of the writer queue
1321  Status DBImpl::MakeRoomForWrite(bool force) {
1322    mutex_.AssertHeld();
1323    assert(!writers_.empty());
1324    bool allow_delay = !force;
1325    Status s;
1326    while (true) {
1327      if (!bg_error_.ok()) {
1328        // Yield previous error
1329        s = bg_error_;
1330        break;
1331      } else if (allow_delay && versions_->NumLevelFiles(0) >=
1332                                    config::kL0_SlowdownWritesTrigger) {
1333        // We are getting close to hitting a hard limit on the number of
1334        // L0 files.  Rather than delaying a single write by several
1335        // seconds when we hit the hard limit, start delaying each
1336        // individual write by 1ms to reduce latency variance.  Also,
1337        // this delay hands over some CPU to the compaction thread in
1338        // case it is sharing the same core as the writer.
1339        mutex_.Unlock();
1340        env_->SleepForMicroseconds(1000);
1341        allow_delay = false;  // Do not delay a single write more than once
1342        mutex_.Lock();
1343      } else if (!force &&
1344                 (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1345        // There is room in current memtable
1346        break;
1347      } else if (imm_ != nullptr) {
1348        // We have filled up the current memtable, but the previous
1349        // one is still being compacted, so we wait.
1350        Log(options_.info_log, "Current memtable full; waiting...\n");
1351        background_work_finished_signal_.Wait();
1352      } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1353        // There are too many level-0 files.
1354        Log(options_.info_log, "Too many L0 files; waiting...\n");
1355        background_work_finished_signal_.Wait();
1356      } else {
1357        // Attempt to switch to a new memtable and trigger compaction of old
1358        assert(versions_->PrevLogNumber() == 0);
1359        uint64_t new_log_number = versions_->NewFileNumber();
1360        WritableFile* lfile = nullptr;
1361        s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1362        if (!s.ok()) {
1363          // Avoid chewing through file number space in a tight loop.
1364          versions_->ReuseFileNumber(new_log_number);
1365          break;
1366        }
1367        delete log_;
1368        delete logfile_;
1369        logfile_ = lfile;
1370        logfile_number_ = new_log_number;
1371        log_ = new log::Writer(lfile);
1372        imm_ = mem_;
1373        has_imm_.store(true, std::memory_order_release);
1374        mem_ = new MemTable(internal_comparator_);
1375        mem_->Ref();
1376        force = false;  // Do not force another compaction if have room
1377        MaybeScheduleCompaction();
1378      }
1379    }
1380    return s;
1381  }
1382  
1383  bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1384    value->clear();
1385  
1386    MutexLock l(&mutex_);
1387    Slice in = property;
1388    Slice prefix("leveldb.");
1389    if (!in.starts_with(prefix)) return false;
1390    in.remove_prefix(prefix.size());
1391  
1392    if (in.starts_with("num-files-at-level")) {
1393      in.remove_prefix(strlen("num-files-at-level"));
1394      uint64_t level;
1395      bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1396      if (!ok || level >= config::kNumLevels) {
1397        return false;
1398      } else {
1399        char buf[100];
1400        snprintf(buf, sizeof(buf), "%d",
1401                 versions_->NumLevelFiles(static_cast<int>(level)));
1402        *value = buf;
1403        return true;
1404      }
1405    } else if (in == "stats") {
1406      char buf[200];
1407      snprintf(buf, sizeof(buf),
1408               "                               Compactions\n"
1409               "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1410               "--------------------------------------------------\n");
1411      value->append(buf);
1412      for (int level = 0; level < config::kNumLevels; level++) {
1413        int files = versions_->NumLevelFiles(level);
1414        if (stats_[level].micros > 0 || files > 0) {
1415          snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
1416                   files, versions_->NumLevelBytes(level) / 1048576.0,
1417                   stats_[level].micros / 1e6,
1418                   stats_[level].bytes_read / 1048576.0,
1419                   stats_[level].bytes_written / 1048576.0);
1420          value->append(buf);
1421        }
1422      }
1423      return true;
1424    } else if (in == "sstables") {
1425      *value = versions_->current()->DebugString();
1426      return true;
1427    } else if (in == "approximate-memory-usage") {
1428      size_t total_usage = options_.block_cache->TotalCharge();
1429      if (mem_) {
1430        total_usage += mem_->ApproximateMemoryUsage();
1431      }
1432      if (imm_) {
1433        total_usage += imm_->ApproximateMemoryUsage();
1434      }
1435      char buf[50];
1436      snprintf(buf, sizeof(buf), "%llu",
1437               static_cast<unsigned long long>(total_usage));
1438      value->append(buf);
1439      return true;
1440    }
1441  
1442    return false;
1443  }
1444  
1445  void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
1446    // TODO(opt): better implementation
1447    MutexLock l(&mutex_);
1448    Version* v = versions_->current();
1449    v->Ref();
1450  
1451    for (int i = 0; i < n; i++) {
1452      // Convert user_key into a corresponding internal key.
1453      InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1454      InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1455      uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1456      uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1457      sizes[i] = (limit >= start ? limit - start : 0);
1458    }
1459  
1460    v->Unref();
1461  }
1462  
1463  // Default implementations of convenience methods that subclasses of DB
1464  // can call if they wish
1465  Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1466    WriteBatch batch;
1467    batch.Put(key, value);
1468    return Write(opt, &batch);
1469  }
1470  
1471  Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1472    WriteBatch batch;
1473    batch.Delete(key);
1474    return Write(opt, &batch);
1475  }
1476  
1477  DB::~DB() = default;
1478  
1479  Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1480    *dbptr = nullptr;
1481  
1482    DBImpl* impl = new DBImpl(options, dbname);
1483    impl->mutex_.Lock();
1484    VersionEdit edit;
1485    // Recover handles create_if_missing, error_if_exists
1486    bool save_manifest = false;
1487    Status s = impl->Recover(&edit, &save_manifest);
1488    if (s.ok() && impl->mem_ == nullptr) {
1489      // Create new log and a corresponding memtable.
1490      uint64_t new_log_number = impl->versions_->NewFileNumber();
1491      WritableFile* lfile;
1492      s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1493                                       &lfile);
1494      if (s.ok()) {
1495        edit.SetLogNumber(new_log_number);
1496        impl->logfile_ = lfile;
1497        impl->logfile_number_ = new_log_number;
1498        impl->log_ = new log::Writer(lfile);
1499        impl->mem_ = new MemTable(impl->internal_comparator_);
1500        impl->mem_->Ref();
1501      }
1502    }
1503    if (s.ok() && save_manifest) {
1504      edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
1505      edit.SetLogNumber(impl->logfile_number_);
1506      s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1507    }
1508    if (s.ok()) {
1509      impl->DeleteObsoleteFiles();
1510      impl->MaybeScheduleCompaction();
1511    }
1512    impl->mutex_.Unlock();
1513    if (s.ok()) {
1514      assert(impl->mem_ != nullptr);
1515      *dbptr = impl;
1516    } else {
1517      delete impl;
1518    }
1519    return s;
1520  }
1521  
1522  Snapshot::~Snapshot() = default;
1523  
1524  Status DestroyDB(const std::string& dbname, const Options& options) {
1525    Env* env = options.env;
1526    std::vector<std::string> filenames;
1527    Status result = env->GetChildren(dbname, &filenames);
1528    if (!result.ok()) {
1529      // Ignore error in case directory does not exist
1530      return Status::OK();
1531    }
1532  
1533    FileLock* lock;
1534    const std::string lockname = LockFileName(dbname);
1535    result = env->LockFile(lockname, &lock);
1536    if (result.ok()) {
1537      uint64_t number;
1538      FileType type;
1539      for (size_t i = 0; i < filenames.size(); i++) {
1540        if (ParseFileName(filenames[i], &number, &type) &&
1541            type != kDBLockFile) {  // Lock file will be deleted at end
1542          Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1543          if (result.ok() && !del.ok()) {
1544            result = del;
1545          }
1546        }
1547      }
1548      env->UnlockFile(lock);  // Ignore error since state is already gone
1549      env->DeleteFile(lockname);
1550      env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1551    }
1552    return result;
1553  }
1554  
1555  }  // namespace leveldb