/ src / leveldb / table / table_builder.cc
table_builder.cc
  1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2  // Use of this source code is governed by a BSD-style license that can be
  3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4  
  5  #include "leveldb/table_builder.h"
  6  
  7  #include <assert.h>
  8  
  9  #include "leveldb/comparator.h"
 10  #include "leveldb/env.h"
 11  #include "leveldb/filter_policy.h"
 12  #include "leveldb/options.h"
 13  #include "table/block_builder.h"
 14  #include "table/filter_block.h"
 15  #include "table/format.h"
 16  #include "util/coding.h"
 17  #include "util/crc32c.h"
 18  
 19  namespace leveldb {
 20  
 21  struct TableBuilder::Rep {
 22    Rep(const Options& opt, WritableFile* f)
 23        : options(opt),
 24          index_block_options(opt),
 25          file(f),
 26          offset(0),
 27          data_block(&options),
 28          index_block(&index_block_options),
 29          num_entries(0),
 30          closed(false),
 31          filter_block(opt.filter_policy == nullptr
 32                           ? nullptr
 33                           : new FilterBlockBuilder(opt.filter_policy)),
 34          pending_index_entry(false) {
 35      index_block_options.block_restart_interval = 1;
 36    }
 37  
 38    Options options;
 39    Options index_block_options;
 40    WritableFile* file;
 41    uint64_t offset;
 42    Status status;
 43    BlockBuilder data_block;
 44    BlockBuilder index_block;
 45    std::string last_key;
 46    int64_t num_entries;
 47    bool closed;  // Either Finish() or Abandon() has been called.
 48    FilterBlockBuilder* filter_block;
 49  
 50    // We do not emit the index entry for a block until we have seen the
 51    // first key for the next data block.  This allows us to use shorter
 52    // keys in the index block.  For example, consider a block boundary
 53    // between the keys "the quick brown fox" and "the who".  We can use
 54    // "the r" as the key for the index block entry since it is >= all
 55    // entries in the first block and < all entries in subsequent
 56    // blocks.
 57    //
 58    // Invariant: r->pending_index_entry is true only if data_block is empty.
 59    bool pending_index_entry;
 60    BlockHandle pending_handle;  // Handle to add to index block
 61  
 62    std::string compressed_output;
 63  };
 64  
 65  TableBuilder::TableBuilder(const Options& options, WritableFile* file)
 66      : rep_(new Rep(options, file)) {
 67    if (rep_->filter_block != nullptr) {
 68      rep_->filter_block->StartBlock(0);
 69    }
 70  }
 71  
 72  TableBuilder::~TableBuilder() {
 73    assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
 74    delete rep_->filter_block;
 75    delete rep_;
 76  }
 77  
 78  Status TableBuilder::ChangeOptions(const Options& options) {
 79    // Note: if more fields are added to Options, update
 80    // this function to catch changes that should not be allowed to
 81    // change in the middle of building a Table.
 82    if (options.comparator != rep_->options.comparator) {
 83      return Status::InvalidArgument("changing comparator while building table");
 84    }
 85  
 86    // Note that any live BlockBuilders point to rep_->options and therefore
 87    // will automatically pick up the updated options.
 88    rep_->options = options;
 89    rep_->index_block_options = options;
 90    rep_->index_block_options.block_restart_interval = 1;
 91    return Status::OK();
 92  }
 93  
 94  void TableBuilder::Add(const Slice& key, const Slice& value) {
 95    Rep* r = rep_;
 96    assert(!r->closed);
 97    if (!ok()) return;
 98    if (r->num_entries > 0) {
 99      assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
100    }
101  
102    if (r->pending_index_entry) {
103      assert(r->data_block.empty());
104      r->options.comparator->FindShortestSeparator(&r->last_key, key);
105      std::string handle_encoding;
106      r->pending_handle.EncodeTo(&handle_encoding);
107      r->index_block.Add(r->last_key, Slice(handle_encoding));
108      r->pending_index_entry = false;
109    }
110  
111    if (r->filter_block != nullptr) {
112      r->filter_block->AddKey(key);
113    }
114  
115    r->last_key.assign(key.data(), key.size());
116    r->num_entries++;
117    r->data_block.Add(key, value);
118  
119    const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
120    if (estimated_block_size >= r->options.block_size) {
121      Flush();
122    }
123  }
124  
125  void TableBuilder::Flush() {
126    Rep* r = rep_;
127    assert(!r->closed);
128    if (!ok()) return;
129    if (r->data_block.empty()) return;
130    assert(!r->pending_index_entry);
131    WriteBlock(&r->data_block, &r->pending_handle);
132    if (ok()) {
133      r->pending_index_entry = true;
134      r->status = r->file->Flush();
135    }
136    if (r->filter_block != nullptr) {
137      r->filter_block->StartBlock(r->offset);
138    }
139  }
140  
141  void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
142    // File format contains a sequence of blocks where each block has:
143    //    block_data: uint8[n]
144    //    type: uint8
145    //    crc: uint32
146    assert(ok());
147    Rep* r = rep_;
148    Slice raw = block->Finish();
149  
150    Slice block_contents;
151    CompressionType type = r->options.compression;
152    // TODO(postrelease): Support more compression options: zlib?
153    switch (type) {
154      case kNoCompression:
155        block_contents = raw;
156        break;
157  
158      case kSnappyCompression: {
159        std::string* compressed = &r->compressed_output;
160        if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
161            compressed->size() < raw.size() - (raw.size() / 8u)) {
162          block_contents = *compressed;
163        } else {
164          // Snappy not supported, or compressed less than 12.5%, so just
165          // store uncompressed form
166          block_contents = raw;
167          type = kNoCompression;
168        }
169        break;
170      }
171    }
172    WriteRawBlock(block_contents, type, handle);
173    r->compressed_output.clear();
174    block->Reset();
175  }
176  
177  void TableBuilder::WriteRawBlock(const Slice& block_contents,
178                                   CompressionType type, BlockHandle* handle) {
179    Rep* r = rep_;
180    handle->set_offset(r->offset);
181    handle->set_size(block_contents.size());
182    r->status = r->file->Append(block_contents);
183    if (r->status.ok()) {
184      char trailer[kBlockTrailerSize];
185      trailer[0] = type;
186      uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
187      crc = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
188      EncodeFixed32(trailer + 1, crc32c::Mask(crc));
189      r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
190      if (r->status.ok()) {
191        r->offset += block_contents.size() + kBlockTrailerSize;
192      }
193    }
194  }
195  
196  Status TableBuilder::status() const { return rep_->status; }
197  
198  Status TableBuilder::Finish() {
199    Rep* r = rep_;
200    Flush();
201    assert(!r->closed);
202    r->closed = true;
203  
204    BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
205  
206    // Write filter block
207    if (ok() && r->filter_block != nullptr) {
208      WriteRawBlock(r->filter_block->Finish(), kNoCompression,
209                    &filter_block_handle);
210    }
211  
212    // Write metaindex block
213    if (ok()) {
214      BlockBuilder meta_index_block(&r->options);
215      if (r->filter_block != nullptr) {
216        // Add mapping from "filter.Name" to location of filter data
217        std::string key = "filter.";
218        key.append(r->options.filter_policy->Name());
219        std::string handle_encoding;
220        filter_block_handle.EncodeTo(&handle_encoding);
221        meta_index_block.Add(key, handle_encoding);
222      }
223  
224      // TODO(postrelease): Add stats and other meta blocks
225      WriteBlock(&meta_index_block, &metaindex_block_handle);
226    }
227  
228    // Write index block
229    if (ok()) {
230      if (r->pending_index_entry) {
231        r->options.comparator->FindShortSuccessor(&r->last_key);
232        std::string handle_encoding;
233        r->pending_handle.EncodeTo(&handle_encoding);
234        r->index_block.Add(r->last_key, Slice(handle_encoding));
235        r->pending_index_entry = false;
236      }
237      WriteBlock(&r->index_block, &index_block_handle);
238    }
239  
240    // Write footer
241    if (ok()) {
242      Footer footer;
243      footer.set_metaindex_handle(metaindex_block_handle);
244      footer.set_index_handle(index_block_handle);
245      std::string footer_encoding;
246      footer.EncodeTo(&footer_encoding);
247      r->status = r->file->Append(footer_encoding);
248      if (r->status.ok()) {
249        r->offset += footer_encoding.size();
250      }
251    }
252    return r->status;
253  }
254  
255  void TableBuilder::Abandon() {
256    Rep* r = rep_;
257    assert(!r->closed);
258    r->closed = true;
259  }
260  
261  uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
262  
263  uint64_t TableBuilder::FileSize() const { return rep_->offset; }
264  
265  }  // namespace leveldb