Implementing Codex BitTorrent extension.md
1 In supporting BitTorrent on Codex network, it is important to clarify the pre-conditions: what do we expect to have as an input, and when will be the output. 2 3 BitTorrent itself, can have three types of inputs: 4 5 - a `.torrent` manifest file - a b-encoded [[BitTorrent metadata files]] - different formats for torrent version one and version 2 6 - a magnet link - introduced in [[BEP9 - Extension for Peers to Send Metadata Files]] to support trackerless torrent and using DHT for peer discovery 7 8 In both cases there are differences between version 1 and version 2 of metadata files (see [[BitTorrent metadata files]] for details) and version 1 and version 2 of [[Magnet Links|magnet links]]. 9 10 A torrent file, provides a complete description of the torrent, and can be used to compute the corresponding `info` hash. 11 12 Thus, while uploading (or seeding) BitTorrent content to the Codex network, the input is the content itself, while the output will be a (hybrid) magnet link. 13 14 To retrieve previously seeded content, the input can be a torrent file, a magnet link, or directly an info hash (either v1 or v2, tagged or untagged). 15 16 This is illustrated on the following picture: 17 18 ![[BitTorrent-Upload-Download.svg]] 19 20 Thus, from the implementation perspective, the actual input to the Codex network while retrieving previously uploaded content is its `info` hash. 21 22 ### Uploading BitTorrent Content to Codex 23 24 For the time being we only support version 1 and only a single file content (supporting directories and version 2 is work in progress). As a side not, limiting the description to this much simplified version will help to emphasize the important implementation challenges without being distracted with technicalities related to handling multiple file and folders. 25 26 Thus, let's assume we have a single input file: `data40k.bin`. It is a binary file of size `40KiB` (`40960` Bytes). We will be using `16KiB` (`16384` Bytes) block size, and commonly used for such small content `piece length` of `256KiB` (`262144` Bytes). 27 28 Let's go step by step through the code base to understand the upload process and the related challenges. 29 30 First, the upload API: 31 32 ``` 33 /api/codex/v1/torrent 34 ``` 35 36 To upload the content we can use the following `POST` request: 37 38 ```bash 39 curl -X POST \ 40 http://localhost:8001/api/codex/v1/torrent \ 41 -H 'Content-Type: application/octet-stream' \ 42 -H 'Content-Disposition: filename="data40k.bin"' \ 43 -w '\n' \ 44 -T data40k.bin 45 ``` 46 47 We use `Content Disposition` header to indicate the name we want to use for the uploaded content. 48 49 This will land to the API handler in `codex/rest/api.nim` : 50 51 ```nim 52 router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse: 53 ## Upload a file in a streaming manner 54 ## 55 ``` 56 57 It will call `node.storeTorrent` to effectively upload the content and get the resulting `info` (multi) hash back: 58 59 ```nim 60 without infoHash =? ( 61 await node.storeTorrent( 62 AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)), 63 filename = filename, 64 mimetype = mimetype, 65 ) 66 ), error: 67 error "Error uploading file", exc = error.msg 68 return RestApiResponse.error(Http500, error.msg) 69 ``` 70 71 This brings us to `node.storeTorrent` in `codex/node.nim: 72 73 ```nim 74 proc storeTorrent*( 75 self: CodexNodeRef, 76 stream: LPStream, 77 filename: ?string = string.none, 78 mimetype: ?string = string.none, 79 ): Future[?!MultiHash] {.async.} = 80 info "Storing BitTorrent data" 81 82 without bitTorrentManifest =? 83 await self.storePieces( 84 stream, filename = filename, mimetype = mimetype, blockSize = BitTorrentBlockSize 85 ): 86 return failure("Unable to store BitTorrent data") 87 88 trace "Created BitTorrent manifest", bitTorrentManifest = $bitTorrentManifest 89 90 let infoBencoded = bencode(bitTorrentManifest.info) 91 92 trace "BitTorrent Info successfully bencoded" 93 94 without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err: 95 return failure(err) 96 97 trace "computed info hash", infoHash = $infoHash 98 99 without manifestBlk =? await self.storeBitTorrentManifest( 100 bitTorrentManifest, infoHash 101 ), err: 102 error "Unable to store manifest" 103 return failure(err) 104 105 info "Stored BitTorrent data", 106 infoHash = $infoHash, codexManifestCid = bitTorrentManifest.codexManifestCid 107 108 success infoHash 109 ``` 110 111 It starts with `self.storePieces`, which returns a [[BitTorrent Manifest]]. A manifest contains the BitTorrent Info dictionary and the corresponding Codex Manifest Cid: 112 113 ``` 114 type 115 BitTorrentInfo* = ref object 116 length*: uint64 117 pieceLength*: uint32 118 pieces*: seq[MultiHash] 119 name*: ?string 120 121 BitTorrentManifest* = ref object 122 info*: BitTorrentInfo 123 codexManifestCid*: Cid 124 ``` 125 126 `storePieces` does a very similar job to the `store` proc which is used for the regular Codex content, but additionally, it computes the *piece hashes* and creates the `info` dictionary and finally returns the corresponding `BitTorrentManifest`. 127 128 Back in `storeTorrent`, we *b-encode* the `info` dictionary and compute its hash (multihash). This `info` (multi) hash is what we will use to announce the content on the Codex DHT (see [[Announcing BitTorrent Content on Codex DHT]]). 129 130 Finally, `storeBitTorrentManifest` will effectively store the BitTorrent manifest block on the Codex network: 131 132 ``` 133 proc storeBitTorrentManifest*( 134 self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: MultiHash 135 ): Future[?!bt.Block] {.async.} = 136 let encodedManifest = manifest.encode() 137 138 without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: 139 trace "Unable to create CID for BitTorrent info hash" 140 return failure(error) 141 142 without blk =? bt.Block.new(data = encodedManifest, cid = infoHashCid, verify = false), 143 error: 144 trace "Unable to create block from manifest" 145 return failure(error) 146 147 if err =? (await self.networkStore.putBlock(blk)).errorOption: 148 trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg 149 return failure(err) 150 151 success blk 152 ``` 153 154 Some important things happen here. First, notice, that in Codex we use `Cids` to refer to the content. This is very handy: requesting a cid and getting the corresponding data back, we can immediately check if the content multihash present in the Cid, matches the computed multihash of the received data. If they do not match, we know immediately that the received block is invalid. 155 156 But looking at the code above, a careful reader will spot immediately that we are *cheating* a bit. 157 158 We first create a cid (`infoHashCid`) using precomputed `infoHash`, which we then associate with the `encodedManifest` in the `Block.new` call. Clearly, `info` hash does not identify our `encodedManifest`: if we compute a hash of the `encodedManifest`, it would not match the precomputed `infoHash`. This is because our Bit Torrent Manifest is more than just the `info` dictionary: it also contains the cid of the corresponding Codex Manifest for our content. 159 160 This cid is clearly not a valid cid. 161 162 We could create a valid Cid, by, for instance, creating a hash over the whole `encodedManifest` and appending it to the precomputed `infoHash` in such a Cid. Then, while retrieving the corresponding block back, we could first compare that the computed hash over the retrieved data matches the hash of the `encodedManifest` that we included in our cid, and then after reconstructing the BitTorrent Manifest from the encoded data, we could b-encode the `info` dictionary from the reconstructed BitTorrent Manifest, compute its hash, and compare it with the precomputed `infoHash` included in the cid. This would make the cid valid, but there is a problem with this approach. 163 164 In Codex, we use cids as references to blocks in `RepoStore`. We namely use cids as inputs to functions like `createBlockExpirationMetadataKey` or `makePrefixKey`. The cid itself is not preserved. The uploader (the seeder) has all necessary data to create an extended cid we describe in the paragraph above, but when requesting, the downloader knows only the `info` hash or potentially the contents of the the `.torrent` metadata file. In any case, the downloader does not know the cid of the underlying Codex manifest, pointing to the actual data. This means that the downloader is unable to create a full cid with the appended hash of the full `encodedManifest`. It is technically possible to send such an incomplete cid and use it to retrieve the full cid from the uploader datastore, but we are not making the system any more secure by doing this. The sender, can easily send a forged block with with perfectly valid cid as it has all necessary information to compute the appended hash, but the receiver, not having access to this information beforehand, will not be able to validate it. 165 166 Does it mean we can only be sure that the received content identified by the cid of the Codex manifest matches the requested info hash? No. 167 168 Notice, that BitTorrent does not use cids. The BitTorrent protocol operates at the level of pieces, and in version 1 of the protocol does not even use inclusion proofs. Yet, it does not wait till the whole piece is fetched in order to conclude it is genuine. 169 170 The info dictionary contains the `pieces` attribute, with hashes for all pieces. Once the piece is aggregated from the underlying blocks of `16KiB`, the hash is computed and compared against an entry in the `pieces` array. And this exactly what we do in Codex in order to prove that the received data, identified by the cid of the Codex manifest, matches the requested `info` hash. 171 Moreover, we also validate the received data at the block level, even before being able to validate the complete piece. We get this as a bonus from the Codex protocol, which together with data block, sends also the corresponding inclusion proof. Thus, even though at the moment we validate the individual blocks, we do not know if the received data, identified by the cid of the Codex manifest, matches the requested `info` hash, we do know already if the received data matches the Codex manifest. If this is not the case, if does not even make sense to aggregate pieces. 172 173 Thus, to summarize, while we cannot validate if the received BitTorrent manifest points to the valid data by validating the corresponding cid (`infoHashCid`), we do it later in two phases. Let's look at the download flow, starting from the end. 174 175 ### Downloading BitTorrent Content from Codex 176 177 We start from the `NetworkPeer.readLoop` (in `codex/blockexchange/network/networkpeer.nim`), where we decode the protocol `Message` with: 178 179 ```nim 180 data = await conn.readLp(MaxMessageSize.int) 181 msg = Message.protobufDecode(data).mapFailure().tryGet() 182 ``` 183 184 There, for each data item, we call: 185 186 ```nim 187 BlockDelivery.decode(initProtoBuffer(item, maxSize = MaxBlockSize)) 188 ``` 189 190 and this is where we get the cid, `Block`, `BlockAddress`, and the corresponding `proof` (for regular data, or *leaf* blocks): 191 192 ```nim 193 proc decode*(_: type BlockDelivery, pb: ProtoBuffer): ProtoResult[BlockDelivery] = 194 var 195 value = BlockDelivery() 196 dataBuf = newSeq[byte]() 197 cidBuf = newSeq[byte]() 198 cid: Cid 199 ipb: ProtoBuffer 200 201 if ?pb.getField(1, cidBuf): 202 cid = ?Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob) 203 if ?pb.getField(2, dataBuf): 204 value.blk = 205 ?Block.new(cid, dataBuf, verify = true).mapErr(x => ProtoError.IncorrectBlob) 206 if ?pb.getField(3, ipb): 207 value.address = ?BlockAddress.decode(ipb) 208 209 if value.address.leaf: 210 var proofBuf = newSeq[byte]() 211 if ?pb.getField(4, proofBuf): 212 let proof = ?CodexProof.decode(proofBuf).mapErr(x => ProtoError.IncorrectBlob) 213 value.proof = proof.some 214 else: 215 value.proof = CodexProof.none 216 else: 217 value.proof = CodexProof.none 218 219 ok(value) 220 ``` 221 222 We see that we while constructing instance of `Block`, we already request the block validation by setting `verify = true`: 223 224 ```nim 225 proc new*( 226 T: type Block, cid: Cid, data: openArray[byte], verify: bool = true 227 ): ?!Block = 228 ## creates a new block for both storage and network IO 229 ## 230 231 without isTorrent =? cid.isTorrentCid, err: 232 return "Unable to determine if cid is torrent info hash".failure 233 234 # info hash cids are "fake cids" - they will not validate 235 # info hash validation is done outside of the cid itself 236 if verify and not isTorrent: 237 let 238 mhash = ?cid.mhash.mapFailure 239 computedMhash = ?MultiHash.digest($mhash.mcodec, data).mapFailure 240 computedCid = ?Cid.init(cid.cidver, cid.mcodec, computedMhash).mapFailure 241 if computedCid != cid: 242 return "Cid doesn't match the data".failure 243 244 return Block(cid: cid, data: @data).success 245 ``` 246 247 Here we see that because as explained above, the cids corresponding to the BitTorrent manifest blocks cannot be immediately validated, we make sure, the validation is skipped here for Torrent cids. 248 249 Once the `Message` is decoded, back in `NetworkPeer.readLoop`, it is passed to `NetworkPeer.handler` which is set to `Network.rpcHandler` while creating the instance of `NetworkPeer` in `Network.getOrCreatePeer`. For block deliveries, `Network.rpcHandler` forwards `msg.payload` (`seq[BlockDelivery]`) to `Network.handleBlocksDelivery`, which in turn, calls `Network.handlers.onBlocksDelivery`. The `Network.handlers.onBlocksDelivery` is set by the constructor of `BlockExcEngine`. Thus, in the end of its journey, a `seq[BlockDelivery]` from the `msg.payload` ends up in `BlockExcEngine.blocksDeliveryHandler`. This is where the data blocks are further validated against the inclusion proof and then the validated data (*leafs*) blocks or non-data blocks (non-*leafs*, e.g. a BitTorrent or Codex Manifest block), are stored in the `localStore` and then *resolved* against pending blocks via `BlockExcEngine.resolveBlocks` that calls `pendingBlocks.resolve(blocksDelivery)` (`PendingBlocksManager`). This is where `blockReq.handle.complete(bd.blk)` is called on the matching pending blocks, which completes future awaited in `BlockExcEngine.requestBlock`, which completes the future awaited in `NetworkStore.getBlock`: `await self.engine.requestBlock(address)`. And `NetworkStore.getBlock` was awaited either in `CodexNodeRef.fetchPieces` for data blocks or in `CodexNodeRef.fetchTorrentManifest`. 250 251 So, how do we get to `CodexNodeRef.fetchPieces` and `CodexNodeRef.fetchTorrentManifest` in the download flow. 252 253 It starts with the API handler of `/api/codex/v1/torrent/{infoHash}/network/stream`: 254 255 ```nim 256 router.api(MethodGet, "/api/codex/v1/torrent/{infoHash}/network/stream") do( 257 infoHash: MultiHash, resp: HttpResponseRef 258 ) -> RestApiResponse: 259 var headers = buildCorsHeaders("GET", allowedOrigin) 260 261 without infoHash =? infoHash.mapFailure, error: 262 return RestApiResponse.error(Http400, error.msg, headers = headers) 263 264 if infoHash.mcodec != Sha1HashCodec: 265 return RestApiResponse.error( 266 Http400, "Only torrents version 1 are currently supported!", headers = headers 267 ) 268 269 if corsOrigin =? allowedOrigin: 270 resp.setCorsHeaders("GET", corsOrigin) 271 resp.setHeader("Access-Control-Headers", "X-Requested-With") 272 273 trace "torrent requested: ", multihash = $infoHash 274 275 await node.retrieveInfoHash(infoHash, resp = resp) 276 ``` 277 278 `CodexNodeRef.retrieveInfoHash` first tries to fetch the `Torrent` object, which consists of `torrentManifest` and `codexManifest`. To get it, it calls `node.retrieveTorrent(infoHash)` with the `infoHash` as the argument. And then in the `retrieveTorrent` we get to the above mentioned `fetchTorrentManifest`: 279 280 ```nim 281 proc retrieveTorrent*( 282 self: CodexNodeRef, infoHash: MultiHash 283 ): Future[?!Torrent] {.async.} = 284 without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: 285 trace "Unable to create CID for BitTorrent info hash" 286 return failure(error) 287 288 without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err: 289 trace "Unable to fetch Torrent Manifest" 290 return failure(err) 291 292 without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)), 293 err: 294 trace "Unable to fetch Codex Manifest for torrent info hash" 295 return failure(err) 296 297 success (torrentManifest: torrentManifest, codexManifest: codexManifest) 298 ``` 299 300 We first create `infoHashCid`, using only the precomputed `infoHash` and we pass it to `fetchTorrentManifest`: 301 302 ```nim 303 proc fetchTorrentManifest*( 304 self: CodexNodeRef, infoHashCid: Cid 305 ): Future[?!BitTorrentManifest] {.async.} = 306 if err =? infoHashCid.isTorrentInfoHash.errorOption: 307 return failure "CID has invalid content type for torrent info hash {$cid}" 308 309 trace "Retrieving torrent manifest for infoHashCid", infoHashCid 310 311 without blk =? await self.networkStore.getBlock(BlockAddress.init(infoHashCid)), err: 312 trace "Error retrieve manifest block", infoHashCid, err = err.msg 313 return failure err 314 315 trace "Successfully retrieved torrent manifest with given block cid", 316 cid = blk.cid, infoHashCid 317 trace "Decoding torrent manifest" 318 319 without torrentManifest =? BitTorrentManifest.decode(blk), err: 320 trace "Unable to decode torrent manifest", err = err.msg 321 return failure("Unable to decode torrent manifest") 322 323 trace "Decoded torrent manifest", infoHashCid, torrentManifest = $torrentManifest 324 325 without isValid =? torrentManifest.validate(infoHashCid), err: 326 trace "Error validating torrent manifest", infoHashCid, err = err.msg 327 return failure(err.msg) 328 329 if not isValid: 330 trace "Torrent manifest does not match torrent info hash", infoHashCid 331 return failure "Torrent manifest does not match torrent info hash {$infoHashCid}" 332 333 return torrentManifest.success 334 ``` 335 336 Here we will be awaiting for the `networkStore.getBlock`, which will get completed with the block delivery flow we describe at the beginning of this section. We restore the `BitTorrentManifest` object using `BitTorrentManifest.decode(blk)`, and then we validate if the `info` dictionary from the received BitTorrent manifest matches the request `infoHash`: 337 338 ```nim 339 without isValid =? torrentManifest.validate(infoHashCid), err: 340 trace "Error validating torrent manifest", infoHashCid, err = err.msg 341 return failure(err.msg) 342 ``` 343 344 With this we know that we have a genuine `info` dictionary. 345 346 Now, we still need to get and validate the actual data. 347 348 BitTorrent manifest includes the cid of the Codex manifest in `codexManifestCid` attribute. Back in `retrieveTorrent`, we now fetch the Codex manifest, and we return both to `retrieveInfoHash`, where the download effectively started. 349 350 We are ready to start the streaming operation. The illustration below, shows a high level conceptual overview of the streaming process: 351 352 ![[Codex BitTorrent Streaming.svg]] 353 354 From the diagram above we see, that there are two concurrent tasks: a *prefetch* tasks fetching the blocks from the network store, aggregating, and validating pieces, and a *streaming* tasks, sending the blocks down to the client via REST API. 355 356 The prefetch task is started by `retrieveInfoHash` calling `streamTorrent`, passing both manifests and piece validator as arguments: 357 358 ```nim 359 let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) 360 361 let stream = 362 await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator) 363 ``` 364 365 Let's take a look at `streamTorrent`: 366 367 ```nim 368 proc streamTorrent*( 369 self: CodexNodeRef, 370 torrentManifest: BitTorrentManifest, 371 codexManifest: Manifest, 372 pieceValidator: TorrentPieceValidator, 373 ): Future[LPStream] {.async: (raises: []).} = 374 trace "Retrieving pieces from torrent" 375 let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false)) 376 377 proc prefetch(): Future[void] {.async: (raises: []).} = 378 try: 379 if err =? (await self.fetchPieces(torrentManifest, codexManifest, pieceValidator)).errorOption: 380 error "Unable to fetch blocks", err = err.msg 381 await noCancel pieceValidator.cancel() 382 await noCancel stream.close() 383 except CancelledError: 384 trace "Prefetch cancelled" 385 386 let prefetchTask = prefetch() 387 388 # Monitor stream completion and cancel background jobs when done 389 proc monitorStream() {.async: (raises: []).} = 390 try: 391 await stream.join() 392 except CancelledError: 393 trace "Stream cancelled" 394 finally: 395 await noCancel prefetchTask.cancelAndWait 396 397 self.trackedFutures.track(monitorStream()) 398 399 trace "Creating store stream for torrent manifest" 400 stream 401 ``` 402 403 `streamTorrent` does two things: 404 405 1. starts background `prefetch` task 406 2. monitors the stream using `monitorStream` 407 408 The `prefetch` job calls `fetchPieces`: 409 410 ```nim 411 proc fetchPieces*( 412 self: CodexNodeRef, 413 torrentManifest: BitTorrentManifest, 414 codexManifest: Manifest, 415 pieceValidator: TorrentPieceValidator, 416 ): Future[?!void] {.async: (raises: [CancelledError]).} = 417 let cid = codexManifest.treeCid 418 let numOfBlocksPerPiece = pieceValidator.numberOfBlocksPerPiece 419 let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount) 420 while not blockIter.finished: 421 let blockFutures = collect: 422 for i in 0 ..< numOfBlocksPerPiece: 423 if not blockIter.finished: 424 let address = BlockAddress.init(cid, blockIter.next()) 425 self.networkStore.getBlock(address) 426 427 without blocks =? await allFinishedValues(blockFutures), err: 428 return failure(err) 429 430 if err =? self.validatePiece(pieceValidator, blocks).errorOption: 431 return failure(err) 432 433 await sleepAsync(1.millis) 434 435 success() 436 ``` 437 438 We fetch blocks in *batches*, or rather *in pieces*. We trigger fetching blocks with `self.networkStore.getBlock(address)`, which will resolve by either getting the block from the local store or from the network through block delivery described earlier. 439 440 Notice that we need to get all the relevant blocks here, not only the blocks that are not yet in the local store. This is necessary, because we need to get all the blocks in a piece so that we can validate the piece and potentially stop streaming if the piece turns out to be invalid. 441 442 Before calling `validatePiece`, where validation takes place, we wait for all `Futures` to complete returning the requested `blocks`. 443 444 `validatePiece` is defined as follows: 445 446 ```nim 447 proc validatePiece( 448 self: CodexNodeRef, pieceValidator: TorrentPieceValidator, blocks: seq[bt.Block] 449 ): ?!void {.raises: [].} = 450 trace "Fetched complete torrent piece - verifying..." 451 let pieceIndex = pieceValidator.validatePiece(blocks) 452 453 if pieceIndex < 0: 454 error "Piece verification failed", pieceIndex = pieceIndex 455 return failure(fmt"Piece verification failed for {pieceIndex=}") 456 457 trace "Piece successfully verified", pieceIndex 458 459 let confirmedPieceIndex = pieceValidator.confirmCurrentPiece() 460 461 if pieceIndex != confirmedPieceIndex: 462 error "Piece confirmation failed", 463 pieceIndex = pieceIndex, confirmedPieceIndex = confirmedPieceIndex 464 return 465 failure(fmt"Piece confirmation failed for {pieceIndex=}, {confirmedPieceIndex=}") 466 success() 467 ``` 468 469 It first calls `validatePiece` on the `pieceValidator`, which computes the SHA1 hash of the concatenated blocks and checks if it matches the (multi) hash from the `info` dictionary. 470 471 > [!info] 472 This constitutes the second validation step: after we checked that the `info` dictionary matches the requested `info` hash in the first step described above, here we are making sure that the received content matches the metadata in the `info` dictionary, and thus it is indeed the content identified by the `info` hash from the request. 473 474 `PiecePalidator` maintains internal state so that it known which piece is expected at the given moment - this is why it does not need the piece index argument to validate the blocks. Upon successful validation it returns the index of the validated piece. We then call `pieceValidator.confirmCurrentPiece` to *notify* REST API streaming that is awaiting on `torrentPieceValidator.waitForNextPiece()` before streaming the validated blocks to the requesting client: 475 476 ```nim 477 proc retrieveInfoHash( 478 node: CodexNodeRef, infoHash: MultiHash, resp: HttpResponseRef 479 ): Future[void] {.async.} = 480 ## Download torrent from the node in a streaming 481 ## manner 482 ## 483 var stream: LPStream 484 485 var bytes = 0 486 try: 487 without torrent =? (await node.retrieveTorrent(infoHash)), err: 488 error "Unable to fetch Torrent Metadata", err = err.msg 489 resp.status = Http404 490 await resp.sendBody(err.msg) 491 return 492 let (torrentManifest, codexManifest) = torrent 493 494 if codexManifest.mimetype.isSome: 495 resp.setHeader("Content-Type", codexManifest.mimetype.get()) 496 else: 497 resp.addHeader("Content-Type", "application/octet-stream") 498 499 if codexManifest.filename.isSome: 500 resp.setHeader( 501 "Content-Disposition", 502 "attachment; filename=\"" & codexManifest.filename.get() & "\"", 503 ) 504 else: 505 resp.setHeader("Content-Disposition", "attachment") 506 507 await resp.prepareChunked() 508 509 let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) 510 511 let stream = 512 await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator) 513 514 while not stream.atEof: 515 trace "Waiting for piece..." 516 let pieceIndex = await torrentPieceValidator.waitForNextPiece() 517 518 if -1 == pieceIndex: 519 warn "No more torrent pieces expected. TorrentPieceValidator might be out of sync!" 520 break 521 522 trace "Got piece", pieceIndex 523 524 let blocksPerPieceIter = torrentPieceValidator.getNewBlocksPerPieceIterator() 525 while not blocksPerPieceIter.finished and not stream.atEof: 526 var buff = newSeqUninitialized[byte](BitTorrentBlockSize.int) 527 # wait for the next the piece to prefetch 528 let len = await stream.readOnce(addr buff[0], buff.len) 529 530 buff.setLen(len) 531 if buff.len <= 0: 532 break 533 534 bytes += buff.len 535 536 await resp.sendChunk(addr buff[0], buff.len) 537 discard blocksPerPieceIter.next() 538 await resp.finish() 539 codex_api_downloads.inc() 540 except CancelledError as exc: 541 info "Stream cancelled", exc = exc.msg 542 raise exc 543 except CatchableError as exc: 544 warn "Error streaming blocks", exc = exc.msg 545 resp.status = Http500 546 if resp.isPending(): 547 await resp.sendBody(exc.msg) 548 finally: 549 info "Sent bytes for torrent", infoHash = $infoHash, bytes 550 if not stream.isNil: 551 await stream.close() 552 ``` 553 554 Now, two important points. First, when the streaming happens to be interrupted the stream will be closed in the `finally` block. This in turns will be detected by the `monitorStream` in `streamTorrent` causing the `prefetch` job to be cancelled. Second, when either piece validation fails, or if any of the `getBlock` future awaiting completion fails, `prefetch` will return error, which will cause the stream to be closed: 555 556 ```nim 557 proc prefetch(): Future[void] {.async: (raises: []).} = 558 try: 559 if err =? ( 560 await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived) 561 ).errorOption: 562 error "Unable to fetch blocks", err = err.msg 563 await noCancel pieceValidator.cancel() 564 await noCancel stream.close() 565 except CancelledError: 566 trace "Prefetch cancelled" 567 ``` 568 569 Without this detection mechanism, we would either continue fetching blocks even when streaming API request has been interrupted, or we would continue streaming, even when it is already known that the piece validation phase has failed. This would result in potentially invalid content being returned to the client. After any failure in the `prefetch` job, the pieces will no longer be validated, and thus it does not make any sense to continue the streaming operation. 570 571 The `stream.readOnce` called in the streaming loop and implemented in `StoreStream`, which uses the same underlying `networkStore` that is also used in `fetchPieces` proc shown above, will be calling that same `getBlock` operation, which in case the block is not already in local store (because it was already there or as a result of the prefetch operation), will request it from the block exchange engine via `BlockExcEngine.requestBlock` operation. In case there is already a pending request for the given block address, the `PendingBlocksManager` will return the existing block handle, so that `BlockExcEngine.requestBlock` operation will not cause duplicate request. It will, however potentially return an invalid block to the client, before the containing piece has been validated in the prefetch phase. 572 573 If you want to experiment with uploading and downloading BitTorrent content yourself, check [[Upload and download BitTorrent content with Codex - demo]].