/ 10 Notes / Uploading and downloading content in Codex.md
Uploading and downloading content in Codex.md
  1  #codex/upload #codex/download
  2  
  3  | related | [[Codex Blocks]], [[Block Storage]], [[Codex Block Exchange Protocol]] |
  4  | ------- | ---------------------------------------------------------------------- |
  5  
  6  For a high level overview of how Codex works, you can check the [[Codex-BitTorrent Integration presentation]] slides.
  7  
  8  To have a more detailed overview of the block exchange architecture, please refer to [Codex Block Exchange Architecture](https://link.excalidraw.com/readonly/L0Rz0LU3oUBDjpHh9rIp).
  9  
 10  We upload the content with API `/api/codex/v1/data`. The handler defined in `codex/rest/api.nim` calls `CodexNodeRef.store` and then returns the `Cid` of the manifest file corresponding to the contents:
 11  
 12  ```nim
 13  without cid =? (
 14    await node.store(
 15      AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)),
 16  	  filename = filename,
 17  	  mimetype = mimetype,
 18  	)
 19    ), error:
 20  	error "Error uploading file", exc = error.msg
 21  	return RestApiResponse.error(Http500, error.msg)
 22  
 23    codex_api_uploads.inc()
 24    trace "Uploaded file", cid
 25    return RestApiResponse.response($cid)
 26  ```
 27  
 28  > See [Using Codex](https://docs.codex.storage/learn/using) in the Codex docs on how to use the Codex client.
 29  
 30  `node.store` (`codex/node.nim`) reads data from the stream, and from each `chunk` it does the following:
 31  
 32  ```nim
 33  while (let chunk = await chunker.getBytes(); chunk.len > 0):
 34    without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
 35      return failure(err)
 36  
 37    without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err:
 38      return failure(err)
 39  
 40    without blk =? bt.Block.new(cid, chunk, verify = false):
 41      return failure("Unable to init block from chunk!")
 42  
 43    cids.add(cid)
 44  
 45    if err =? (await self.networkStore.putBlock(blk)).errorOption:
 46      error "Unable to store block", cid = blk.cid, err = err.msg
 47      return failure(&"Unable to store block {blk.cid}")
 48  ```
 49  
 50  The default chunk size is given by the default block size defined in `codex/codextypes.nim`:
 51  
 52  ```nim
 53  const
 54    # Size of blocks for storage / network exchange,
 55    DefaultBlockSize* = NBytes 1024 * 64
 56  ```
 57  
 58  ### storing blocks
 59  
 60  Now, the `networkStore.putBlock`:
 61  
 62  ```nim
 63  method putBlock*(
 64      self: NetworkStore, blk: Block, ttl = Duration.none
 65  ): Future[?!void] {.async.} =
 66    ## Store block locally and notify the network
 67    ##
 68    let res = await self.localStore.putBlock(blk, ttl)
 69    if res.isErr:
 70      return res
 71  
 72    await self.engine.resolveBlocks(@[blk])
 73    return success()
 74  ```
 75  
 76  We first store the stuff locally:
 77  
 78  ```nim
 79  method putBlock*(
 80      self: RepoStore, blk: Block, ttl = Duration.none
 81  ): Future[?!void] {.async.} =
 82    ## Put a block to the blockstore
 83    ##
 84  
 85    logScope:
 86      cid = blk.cid
 87  
 88    let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds
 89  
 90    without res =? await self.storeBlock(blk, expiry), err:
 91      return failure(err)
 92  
 93    if res.kind == Stored:
 94      trace "Block Stored"
 95      if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
 96        # rollback changes
 97        without delRes =? await self.tryDeleteBlock(blk.cid), err:
 98          return failure(err)
 99        return failure(err)
100  
101      if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
102        return failure(err)
103  
104      if onBlock =? self.onBlockStored:
105        await onBlock(blk.cid)
106    else:
107      trace "Block already exists"
108  
109    return success()
110  ```
111  
112  The `storeBlock` defined in `codex/stores/repostore/operations.nim` is where we store the block and the metadata.
113  
114  ```nim
115  proc storeBlock*(
116      self: RepoStore, blk: Block, minExpiry: SecondsSince1970
117  ): Future[?!StoreResult] {.async.} =
118    if blk.isEmpty:
119      return success(StoreResult(kind: AlreadyInStore))
120  
121    without metaKey =? createBlockExpirationMetadataKey(blk.cid), err:
122      return failure(err)
123  
124    without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err:
125      return failure(err)
126  
127    await self.metaDs.modifyGet(
128      ...
129    )
130  ```
131  
132  The `modifyGet` deserve separate treatment.
133  ### modifyGet
134  
135  `modifyGet` is called on the `metaDs`. `metaDs` has a wrapper type:
136  
137  ```nim
138  TypedDatastore* = ref object of RootObj
139    ds*: Datastore
140  ```
141  
142  And `ds` above is:
143  
144  ```nim
145  type
146    LevelDbDatastore* = ref object of Datastore
147      db: LevelDb
148      locks: TableRef[Key, AsyncLock]
149  ```
150  
151  There is a cascade of callbacks going from `RepoStore` through `TypedDatastore` down to `LevelDbDataStore` as presented on the following sequence diagram:
152  
153  ![[repostore_storeblock.svg]]
154  
155  > The diagram above can also be viewed [online](https://www.mermaidchart.com/app/projects/2564c095-670f-4258-b8ea-1c5d0b546845/diagrams/b0ba3207-7833-4fd0-b5b1-9d076585d93a/version/v0.1/edit)  (requires [Mermaid Chart](https://www.mermaidchart.com) account)
156  
157  `LevelDbDataStore` directly interacts with the underlying storage and ensures atomicity of the `modifyGet` operation. `TypedDatastore` performs *encoding* and *decoding* of the data. Finally, `RepoStore` handles metadata creation or update, and also writes the actual block to the underlying block storage via its `repoDS` instance variable.
158  
159  After the blocks are stored in `repoDS`, back in `node.store` (`CodexNodeRef.store`), we build the Merkle Tree for our block cids and then we compute its root (`treeCid`). Finally, for each block (cid) we compute the [[Codex Merkle Proofs|inclusion proofs]], and we store each `cid`, block `index`, and `proof` under the computed `treeCid`:
160  
161  ```nim
162  without tree =? CodexTree.init(cids), err:
163      return failure(err)
164  
165    without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
166      return failure(err)
167  
168    for index, cid in cids:
169      without proof =? tree.getProof(index), err:
170        return failure(err)
171      if err =?
172          (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
173        # TODO add log here
174        return failure(err)
175  ```
176  
177  This concludes the local block storage. We leave the description of `engine.resolveBlocks(@[blk])` for later, when describing the block exchange protocol.
178  
179  ## Downloading content
180  
181  When we want to download the content from the network, we use `/api/codex/v1/data/{cid}/network/stream` API where we call `await node.retrieveCid(cid.get(), local = false, resp = resp)`.
182  
183  `node.retrieveCid` tries to get a stream (descendent of libp2p's `LPStream`):
184  
185  ```nim
186  without stream =? (await node.retrieve(cid, local)), error:
187    if error of BlockNotFoundError:
188  	resp.status = Http404
189  	return await resp.sendBody("")
190    else:
191  	resp.status = Http500
192  	return await resp.sendBody(error.msg)
193  ```
194  
195  This `stream` will be read chunk by chunk (`DefaultBlockSize`) and returned to the client.
196  
197  To see what the `stream` really will be, we need to dive into  `node.retrieve(cid, local)` (`local` is `false` in this case):
198  
199  ```nim
200  proc retrieve*(
201      self: CodexNodeRef, cid: Cid, local: bool = true
202  ): Future[?!LPStream] {.async.} =
203    ## Retrieve by Cid a single block or an entire dataset described by manifest
204    ##
205  
206    if local and not await (cid in self.networkStore):
207      return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
208  
209    without manifest =? (await self.fetchManifest(cid)), err:
210      if err of AsyncTimeoutError:
211        return failure(err)
212  
213      return await self.streamSingleBlock(cid)
214  
215    await self.streamEntireDataset(manifest, cid)
216  ```
217  
218  We first try to get the manifest with `self.fetchManifest(cid)`:
219  
220  ```nim
221  proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} =
222    ## Fetch and decode a manifest block
223    ##
224  
225    if err =? cid.isManifest.errorOption:
226      return failure "CID has invalid content type for manifest {$cid}"
227  
228    trace "Retrieving manifest for cid", cid
229  
230    without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
231      trace "Error retrieve manifest block", cid, err = err.msg
232      return failure err
233  
234    trace "Decoding manifest for cid", cid
235  
236    without manifest =? Manifest.decode(blk), err:
237      trace "Unable to decode as manifest", err = err.msg
238      return failure("Unable to decode as manifest")
239  
240    trace "Decoded manifest", cid
241  
242    return manifest.success
243  ```
244  
245  Manifest is ***single block***, and we get it with:
246  
247  ```nim
248  self.networkStore.getBlock(BlockAddress.init(cid))
249  ```
250  
251  Here `BlockAddress.init(cid)` reduces to `BlockAddress(leaf: false, cid: cid)`, which means our [[Codex Blocks|BlockAddress]] is just a `Cid`. `getBlock` will try to get the block from the `localStore` first:
252  
253  ```nim
254  method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} =
255    without blk =? (await self.localStore.getBlock(address)), err:
256      if not (err of BlockNotFoundError):
257        error "Error getting block from local store", address, err = err.msg
258        return failure err
259  
260      without newBlock =? (await self.engine.requestBlock(address)), err:
261        error "Unable to get block from exchange engine", address, err = err.msg
262        return failure err
263  
264      return success newBlock
265  
266    return success blk
267  ```
268  
269  It is `RepoStore`, which by default is set to be a `FSDatastore`: 
270  
271  ```nim
272  method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
273    ## Get a block from the blockstore
274    ##
275  
276    if address.leaf:
277      self.getBlock(address.treeCid, address.index)
278    else:
279      self.getBlock(address.cid)
280  ```
281  
282  Now, we have `leaf` set to `false`, thus we will be using simpler `getBlock` variant:
283  
284  ```nim
285  method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
286    ## Get a block from the blockstore
287    ##
288  
289    logScope:
290      cid = cid
291  
292    if cid.isEmpty:
293      trace "Empty block, ignoring"
294      return cid.emptyBlock
295  
296    without key =? makePrefixKey(self.postFixLen, cid), err:
297      trace "Error getting key from provider", err = err.msg
298      return failure(err)
299  
300    without data =? await self.repoDs.get(key), err:
301      if not (err of DatastoreKeyNotFound):
302        trace "Error getting block from datastore", err = err.msg, key
303        return failure(err)
304  
305      return failure(newException(BlockNotFoundError, err.msg))
306  
307    trace "Got block for cid", cid
308    return Block.new(cid, data, verify = true)
309  ```
310  
311  If we do not have the block in the `localStore`, we will be trying to get it from the network with `self.engine.requestBlock(address)`:
312  
313  ```nim
314  proc requestBlock*(
315      b: BlockExcEngine, address: BlockAddress
316  ): Future[?!Block] {.async.} =
317    let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
318  
319    if not b.pendingBlocks.isInFlight(address):
320      let peers = b.peers.getPeersForBlock(address)
321  
322      if peers.with.len == 0:
323        b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
324      else:
325        let selected = pickPseudoRandom(address, peers.with)
326        asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
327        b.pendingBlocks.setInFlight(address)
328        await b.sendWantBlock(@[address], selected)
329  
330      await b.sendWantHave(@[address], peers.without)
331  
332    # Don't let timeouts bubble up. We can't be too broad here or we break
333    # cancellations.
334    try:
335      success await blockFuture
336    except AsyncTimeoutError as err:
337      failure err
338  ```
339  
340  This is also where [[Codex WantList]] topic becomes relevant (and perhaps also [[Codex Block Exchange Protocol]].
341  
342  After we got the manifest we will proceed with creating a stream through which we will be stream the data down to the browser:
343  
344  ```nim
345  LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
346  ```
347  
348  The stream abstraction provides a `readOnce` method which will be retrieving the blocks from the `networkStore` and sending the requested bytes down via the stream. `readOnce` is called in `node.retrieveCid`.