Uploading and downloading content in Codex.md
1 #codex/upload #codex/download 2 3 | related | [[Codex Blocks]], [[Block Storage]], [[Codex Block Exchange Protocol]] | 4 | ------- | ---------------------------------------------------------------------- | 5 6 For a high level overview of how Codex works, you can check the [[Codex-BitTorrent Integration presentation]] slides. 7 8 To have a more detailed overview of the block exchange architecture, please refer to [Codex Block Exchange Architecture](https://link.excalidraw.com/readonly/L0Rz0LU3oUBDjpHh9rIp). 9 10 We upload the content with API `/api/codex/v1/data`. The handler defined in `codex/rest/api.nim` calls `CodexNodeRef.store` and then returns the `Cid` of the manifest file corresponding to the contents: 11 12 ```nim 13 without cid =? ( 14 await node.store( 15 AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)), 16 filename = filename, 17 mimetype = mimetype, 18 ) 19 ), error: 20 error "Error uploading file", exc = error.msg 21 return RestApiResponse.error(Http500, error.msg) 22 23 codex_api_uploads.inc() 24 trace "Uploaded file", cid 25 return RestApiResponse.response($cid) 26 ``` 27 28 > See [Using Codex](https://docs.codex.storage/learn/using) in the Codex docs on how to use the Codex client. 29 30 `node.store` (`codex/node.nim`) reads data from the stream, and from each `chunk` it does the following: 31 32 ```nim 33 while (let chunk = await chunker.getBytes(); chunk.len > 0): 34 without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: 35 return failure(err) 36 37 without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err: 38 return failure(err) 39 40 without blk =? bt.Block.new(cid, chunk, verify = false): 41 return failure("Unable to init block from chunk!") 42 43 cids.add(cid) 44 45 if err =? (await self.networkStore.putBlock(blk)).errorOption: 46 error "Unable to store block", cid = blk.cid, err = err.msg 47 return failure(&"Unable to store block {blk.cid}") 48 ``` 49 50 The default chunk size is given by the default block size defined in `codex/codextypes.nim`: 51 52 ```nim 53 const 54 # Size of blocks for storage / network exchange, 55 DefaultBlockSize* = NBytes 1024 * 64 56 ``` 57 58 ### storing blocks 59 60 Now, the `networkStore.putBlock`: 61 62 ```nim 63 method putBlock*( 64 self: NetworkStore, blk: Block, ttl = Duration.none 65 ): Future[?!void] {.async.} = 66 ## Store block locally and notify the network 67 ## 68 let res = await self.localStore.putBlock(blk, ttl) 69 if res.isErr: 70 return res 71 72 await self.engine.resolveBlocks(@[blk]) 73 return success() 74 ``` 75 76 We first store the stuff locally: 77 78 ```nim 79 method putBlock*( 80 self: RepoStore, blk: Block, ttl = Duration.none 81 ): Future[?!void] {.async.} = 82 ## Put a block to the blockstore 83 ## 84 85 logScope: 86 cid = blk.cid 87 88 let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds 89 90 without res =? await self.storeBlock(blk, expiry), err: 91 return failure(err) 92 93 if res.kind == Stored: 94 trace "Block Stored" 95 if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: 96 # rollback changes 97 without delRes =? await self.tryDeleteBlock(blk.cid), err: 98 return failure(err) 99 return failure(err) 100 101 if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: 102 return failure(err) 103 104 if onBlock =? self.onBlockStored: 105 await onBlock(blk.cid) 106 else: 107 trace "Block already exists" 108 109 return success() 110 ``` 111 112 The `storeBlock` defined in `codex/stores/repostore/operations.nim` is where we store the block and the metadata. 113 114 ```nim 115 proc storeBlock*( 116 self: RepoStore, blk: Block, minExpiry: SecondsSince1970 117 ): Future[?!StoreResult] {.async.} = 118 if blk.isEmpty: 119 return success(StoreResult(kind: AlreadyInStore)) 120 121 without metaKey =? createBlockExpirationMetadataKey(blk.cid), err: 122 return failure(err) 123 124 without blkKey =? makePrefixKey(self.postFixLen, blk.cid), err: 125 return failure(err) 126 127 await self.metaDs.modifyGet( 128 ... 129 ) 130 ``` 131 132 The `modifyGet` deserve separate treatment. 133 ### modifyGet 134 135 `modifyGet` is called on the `metaDs`. `metaDs` has a wrapper type: 136 137 ```nim 138 TypedDatastore* = ref object of RootObj 139 ds*: Datastore 140 ``` 141 142 And `ds` above is: 143 144 ```nim 145 type 146 LevelDbDatastore* = ref object of Datastore 147 db: LevelDb 148 locks: TableRef[Key, AsyncLock] 149 ``` 150 151 There is a cascade of callbacks going from `RepoStore` through `TypedDatastore` down to `LevelDbDataStore` as presented on the following sequence diagram: 152 153 ![[repostore_storeblock.svg]] 154 155 > The diagram above can also be viewed [online](https://www.mermaidchart.com/app/projects/2564c095-670f-4258-b8ea-1c5d0b546845/diagrams/b0ba3207-7833-4fd0-b5b1-9d076585d93a/version/v0.1/edit) (requires [Mermaid Chart](https://www.mermaidchart.com) account) 156 157 `LevelDbDataStore` directly interacts with the underlying storage and ensures atomicity of the `modifyGet` operation. `TypedDatastore` performs *encoding* and *decoding* of the data. Finally, `RepoStore` handles metadata creation or update, and also writes the actual block to the underlying block storage via its `repoDS` instance variable. 158 159 After the blocks are stored in `repoDS`, back in `node.store` (`CodexNodeRef.store`), we build the Merkle Tree for our block cids and then we compute its root (`treeCid`). Finally, for each block (cid) we compute the [[Codex Merkle Proofs|inclusion proofs]], and we store each `cid`, block `index`, and `proof` under the computed `treeCid`: 160 161 ```nim 162 without tree =? CodexTree.init(cids), err: 163 return failure(err) 164 165 without treeCid =? tree.rootCid(CIDv1, dataCodec), err: 166 return failure(err) 167 168 for index, cid in cids: 169 without proof =? tree.getProof(index), err: 170 return failure(err) 171 if err =? 172 (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption: 173 # TODO add log here 174 return failure(err) 175 ``` 176 177 This concludes the local block storage. We leave the description of `engine.resolveBlocks(@[blk])` for later, when describing the block exchange protocol. 178 179 ## Downloading content 180 181 When we want to download the content from the network, we use `/api/codex/v1/data/{cid}/network/stream` API where we call `await node.retrieveCid(cid.get(), local = false, resp = resp)`. 182 183 `node.retrieveCid` tries to get a stream (descendent of libp2p's `LPStream`): 184 185 ```nim 186 without stream =? (await node.retrieve(cid, local)), error: 187 if error of BlockNotFoundError: 188 resp.status = Http404 189 return await resp.sendBody("") 190 else: 191 resp.status = Http500 192 return await resp.sendBody(error.msg) 193 ``` 194 195 This `stream` will be read chunk by chunk (`DefaultBlockSize`) and returned to the client. 196 197 To see what the `stream` really will be, we need to dive into `node.retrieve(cid, local)` (`local` is `false` in this case): 198 199 ```nim 200 proc retrieve*( 201 self: CodexNodeRef, cid: Cid, local: bool = true 202 ): Future[?!LPStream] {.async.} = 203 ## Retrieve by Cid a single block or an entire dataset described by manifest 204 ## 205 206 if local and not await (cid in self.networkStore): 207 return failure((ref BlockNotFoundError)(msg: "Block not found in local store")) 208 209 without manifest =? (await self.fetchManifest(cid)), err: 210 if err of AsyncTimeoutError: 211 return failure(err) 212 213 return await self.streamSingleBlock(cid) 214 215 await self.streamEntireDataset(manifest, cid) 216 ``` 217 218 We first try to get the manifest with `self.fetchManifest(cid)`: 219 220 ```nim 221 proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} = 222 ## Fetch and decode a manifest block 223 ## 224 225 if err =? cid.isManifest.errorOption: 226 return failure "CID has invalid content type for manifest {$cid}" 227 228 trace "Retrieving manifest for cid", cid 229 230 without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err: 231 trace "Error retrieve manifest block", cid, err = err.msg 232 return failure err 233 234 trace "Decoding manifest for cid", cid 235 236 without manifest =? Manifest.decode(blk), err: 237 trace "Unable to decode as manifest", err = err.msg 238 return failure("Unable to decode as manifest") 239 240 trace "Decoded manifest", cid 241 242 return manifest.success 243 ``` 244 245 Manifest is ***single block***, and we get it with: 246 247 ```nim 248 self.networkStore.getBlock(BlockAddress.init(cid)) 249 ``` 250 251 Here `BlockAddress.init(cid)` reduces to `BlockAddress(leaf: false, cid: cid)`, which means our [[Codex Blocks|BlockAddress]] is just a `Cid`. `getBlock` will try to get the block from the `localStore` first: 252 253 ```nim 254 method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} = 255 without blk =? (await self.localStore.getBlock(address)), err: 256 if not (err of BlockNotFoundError): 257 error "Error getting block from local store", address, err = err.msg 258 return failure err 259 260 without newBlock =? (await self.engine.requestBlock(address)), err: 261 error "Unable to get block from exchange engine", address, err = err.msg 262 return failure err 263 264 return success newBlock 265 266 return success blk 267 ``` 268 269 It is `RepoStore`, which by default is set to be a `FSDatastore`: 270 271 ```nim 272 method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = 273 ## Get a block from the blockstore 274 ## 275 276 if address.leaf: 277 self.getBlock(address.treeCid, address.index) 278 else: 279 self.getBlock(address.cid) 280 ``` 281 282 Now, we have `leaf` set to `false`, thus we will be using simpler `getBlock` variant: 283 284 ```nim 285 method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = 286 ## Get a block from the blockstore 287 ## 288 289 logScope: 290 cid = cid 291 292 if cid.isEmpty: 293 trace "Empty block, ignoring" 294 return cid.emptyBlock 295 296 without key =? makePrefixKey(self.postFixLen, cid), err: 297 trace "Error getting key from provider", err = err.msg 298 return failure(err) 299 300 without data =? await self.repoDs.get(key), err: 301 if not (err of DatastoreKeyNotFound): 302 trace "Error getting block from datastore", err = err.msg, key 303 return failure(err) 304 305 return failure(newException(BlockNotFoundError, err.msg)) 306 307 trace "Got block for cid", cid 308 return Block.new(cid, data, verify = true) 309 ``` 310 311 If we do not have the block in the `localStore`, we will be trying to get it from the network with `self.engine.requestBlock(address)`: 312 313 ```nim 314 proc requestBlock*( 315 b: BlockExcEngine, address: BlockAddress 316 ): Future[?!Block] {.async.} = 317 let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) 318 319 if not b.pendingBlocks.isInFlight(address): 320 let peers = b.peers.getPeersForBlock(address) 321 322 if peers.with.len == 0: 323 b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) 324 else: 325 let selected = pickPseudoRandom(address, peers.with) 326 asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) 327 b.pendingBlocks.setInFlight(address) 328 await b.sendWantBlock(@[address], selected) 329 330 await b.sendWantHave(@[address], peers.without) 331 332 # Don't let timeouts bubble up. We can't be too broad here or we break 333 # cancellations. 334 try: 335 success await blockFuture 336 except AsyncTimeoutError as err: 337 failure err 338 ``` 339 340 This is also where [[Codex WantList]] topic becomes relevant (and perhaps also [[Codex Block Exchange Protocol]]. 341 342 After we got the manifest we will proceed with creating a stream through which we will be stream the data down to the browser: 343 344 ```nim 345 LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success 346 ``` 347 348 The stream abstraction provides a `readOnce` method which will be retrieving the blocks from the `networkStore` and sending the requested bytes down via the stream. `readOnce` is called in `node.retrieveCid`.