/ 10 Notes / Implementing Codex BitTorrent extension.md
Implementing Codex BitTorrent extension.md
  1  In supporting BitTorrent on Codex network, it is important to clarify the pre-conditions: what do we expect to have as an input, and when will be the output.
  2  
  3  BitTorrent itself, can have three types of inputs:
  4  
  5  - a `.torrent` manifest file - a b-encoded [[BitTorrent metadata files]] - different formats for torrent version one and version 2
  6  - a magnet link - introduced in [[BEP9 - Extension for Peers to Send Metadata Files]] to support trackerless torrent and using DHT for peer discovery
  7  
  8  In both cases there are differences between version 1 and version 2 of metadata files (see [[BitTorrent metadata files]] for details) and version 1 and version 2 of [[Magnet Links|magnet links]].
  9  
 10  A torrent file, provides a complete description of the torrent, and can be used to compute the corresponding `info` hash.
 11  
 12  Thus, while uploading (or seeding) BitTorrent content to the Codex network, the input is the content itself, while the output will be a (hybrid) magnet link.
 13  
 14  To retrieve previously seeded content, the input can be a torrent file, a magnet link, or directly an info hash (either v1 or v2, tagged or untagged).
 15  
 16  This is illustrated on the following picture:
 17  
 18  ![[BitTorrent-Upload-Download.svg]]
 19  
 20  Thus, from the implementation perspective, the actual input to the Codex network while retrieving previously uploaded content is its `info` hash.
 21  
 22  ### Uploading BitTorrent Content to Codex
 23  
 24  For the time being we only support version 1 and only a single file content (supporting directories and version 2 is work in progress). As a side not, limiting the description to this much simplified version will help to emphasize the important implementation challenges without being distracted with technicalities related to handling multiple file and folders.
 25  
 26  Thus, let's assume we have a single input file: `data40k.bin`. It is a binary file of size `40KiB` (`40960` Bytes). We will be using `16KiB` (`16384` Bytes) block size, and commonly used for such small content `piece length` of `256KiB` (`262144` Bytes).
 27  
 28  Let's go step by step through the code base to understand the upload process and the related challenges.
 29  
 30  First, the upload API:
 31  
 32  ```
 33  /api/codex/v1/torrent
 34  ```
 35  
 36  To upload the content we can use the following `POST` request:
 37  
 38  ```bash
 39  curl -X POST \
 40    http://localhost:8001/api/codex/v1/torrent \
 41    -H 'Content-Type: application/octet-stream' \
 42    -H 'Content-Disposition: filename="data40k.bin"' \
 43    -w '\n' \
 44    -T data40k.bin
 45  ```
 46  
 47  We use `Content Disposition` header to indicate the name we want to use for the uploaded content.
 48  
 49  This will land to the API handler in `codex/rest/api.nim` :
 50  
 51  ```nim
 52  router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse:
 53      ## Upload a file in a streaming manner
 54      ##
 55  ```
 56  
 57  It will call `node.storeTorrent` to effectively upload the content and get the resulting `info` (multi) hash back:
 58  
 59  ```nim
 60  without infoHash =? (
 61    await node.storeTorrent(
 62      AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)),
 63      filename = filename,
 64      mimetype = mimetype,
 65    )
 66  ), error:
 67    error "Error uploading file", exc = error.msg
 68    return RestApiResponse.error(Http500, error.msg)
 69  ```
 70  
 71  This brings us to `node.storeTorrent` in `codex/node.nim:
 72  
 73  ```nim
 74  proc storeTorrent*(
 75      self: CodexNodeRef,
 76      stream: LPStream,
 77      filename: ?string = string.none,
 78      mimetype: ?string = string.none,
 79  ): Future[?!MultiHash] {.async.} =
 80    info "Storing BitTorrent data"
 81  
 82    without bitTorrentManifest =?
 83      await self.storePieces(
 84        stream, filename = filename, mimetype = mimetype, blockSize = BitTorrentBlockSize
 85      ):
 86      return failure("Unable to store BitTorrent data")
 87  
 88    trace "Created BitTorrent manifest", bitTorrentManifest = $bitTorrentManifest
 89  
 90    let infoBencoded = bencode(bitTorrentManifest.info)
 91  
 92    trace "BitTorrent Info successfully bencoded"
 93  
 94    without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err:
 95      return failure(err)
 96  
 97    trace "computed info hash", infoHash = $infoHash
 98  
 99    without manifestBlk =? await self.storeBitTorrentManifest(
100      bitTorrentManifest, infoHash
101    ), err:
102      error "Unable to store manifest"
103      return failure(err)
104  
105    info "Stored BitTorrent data",
106      infoHash = $infoHash, codexManifestCid = bitTorrentManifest.codexManifestCid
107  
108    success infoHash
109  ```
110  
111  It starts with `self.storePieces`, which returns a [[BitTorrent Manifest]]. A manifest contains the BitTorrent Info dictionary and the corresponding Codex Manifest Cid:
112  
113  ```
114  type
115    BitTorrentInfo* = ref object
116      length*: uint64
117      pieceLength*: uint32
118      pieces*: seq[MultiHash]
119      name*: ?string
120  
121    BitTorrentManifest* = ref object
122      info*: BitTorrentInfo
123      codexManifestCid*: Cid
124  ```
125  
126  `storePieces` does a very similar job to the `store` proc which is used for the regular Codex content, but additionally, it computes the *piece hashes* and creates the `info` dictionary and finally returns the corresponding `BitTorrentManifest`.
127  
128  Back in `storeTorrent`, we *b-encode* the `info` dictionary and compute its hash (multihash). This `info` (multi) hash is what we will use to announce the content on the Codex DHT (see [[Announcing BitTorrent Content on Codex DHT]]).
129  
130  Finally, `storeBitTorrentManifest` will effectively store the BitTorrent manifest block on the Codex network:
131  
132  ```
133  proc storeBitTorrentManifest*(
134      self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: MultiHash
135  ): Future[?!bt.Block] {.async.} =
136    let encodedManifest = manifest.encode()
137  
138    without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
139      trace "Unable to create CID for BitTorrent info hash"
140      return failure(error)
141  
142    without blk =? bt.Block.new(data = encodedManifest, cid = infoHashCid, verify = false),
143      error:
144      trace "Unable to create block from manifest"
145      return failure(error)
146  
147    if err =? (await self.networkStore.putBlock(blk)).errorOption:
148      trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg
149      return failure(err)
150  
151    success blk
152  ```
153  
154  Some important things happen here. First, notice, that in Codex we use `Cids` to refer to the content. This is very handy: requesting a cid and getting the corresponding data back, we can immediately check if the content multihash present in the Cid, matches the computed multihash of the received data. If they do not match, we know immediately that the received block is invalid.
155  
156  But looking at the code above, a careful reader will spot immediately that we are *cheating* a bit.
157  
158  We first create a cid (`infoHashCid`) using precomputed `infoHash`, which we then associate with the `encodedManifest` in the `Block.new` call. Clearly, `info` hash does not identify our `encodedManifest`: if we compute a hash of the `encodedManifest`, it would not match the precomputed `infoHash`. This is because our Bit Torrent Manifest is more than just the `info` dictionary: it also contains the cid of the corresponding Codex Manifest for our content.
159  
160  This cid is clearly not a valid cid.
161  
162  We could create a valid Cid, by, for instance, creating a hash over the whole `encodedManifest` and appending it to the precomputed `infoHash` in such a Cid. Then, while retrieving the corresponding block back, we could first compare that the computed hash over the retrieved data matches the hash of the `encodedManifest` that we included in our cid, and then after reconstructing the BitTorrent Manifest from the encoded data, we could b-encode the `info` dictionary from the reconstructed BitTorrent Manifest, compute its hash, and compare it with the precomputed `infoHash` included in the cid. This would make the cid valid, but there is a problem with this approach.
163  
164  In Codex, we use cids as references to blocks in `RepoStore`. We namely use cids as inputs to functions like `createBlockExpirationMetadataKey` or `makePrefixKey`. The cid itself is not preserved. The uploader (the seeder) has all necessary data to create an extended cid we describe in the paragraph above, but when requesting, the downloader knows only the `info` hash or potentially the contents of the the `.torrent` metadata file. In any case, the downloader does not know the cid of the underlying Codex manifest, pointing to the actual data. This means that the downloader is unable to create a full cid with the appended hash of the full `encodedManifest`. It is technically possible to send such an incomplete cid and use it to retrieve the full cid from the uploader datastore, but we are not making the system any more secure by doing this. The sender, can easily send a forged block with with perfectly valid cid as it has all necessary information to compute the appended hash, but the receiver, not having access to this information beforehand, will not be able to validate it.
165  
166  Does it mean we can only be sure that the received content identified by the cid of the Codex manifest matches the requested info hash? No.
167  
168  Notice, that BitTorrent does not use cids. The BitTorrent protocol operates at the level of pieces, and in version 1 of the protocol does not even use inclusion proofs. Yet, it does not wait till the whole piece is fetched in order to conclude it is genuine.
169  
170  The info dictionary contains the `pieces` attribute, with hashes for all pieces. Once the piece is aggregated from the underlying blocks of `16KiB`, the hash is computed and compared against an entry in the `pieces` array. And this exactly what we do in Codex in order to prove that the received data, identified by the cid of the Codex manifest, matches the requested `info` hash.
171  Moreover, we also validate the received data at the block level, even before being able to validate the complete piece. We get this as a bonus from the Codex protocol, which together with data block, sends also the corresponding inclusion proof. Thus, even though at the moment we validate the individual blocks, we do not know if the received data, identified by the cid of the Codex manifest, matches the requested `info` hash, we do know already if the received data matches the Codex manifest. If this is not the case, if does not even make sense to aggregate pieces.
172  
173  Thus, to summarize, while we cannot validate if the received BitTorrent manifest points to the valid data by validating the corresponding cid (`infoHashCid`), we do it later in two phases. Let's look at the download flow, starting from the end.
174  
175  ### Downloading BitTorrent Content from Codex
176  
177  We start from the `NetworkPeer.readLoop` (in `codex/blockexchange/network/networkpeer.nim`), where we decode the protocol `Message` with:
178  
179  ```nim
180  data = await conn.readLp(MaxMessageSize.int)
181  msg = Message.protobufDecode(data).mapFailure().tryGet()
182  ```
183  
184  There, for each data item, we call:
185  
186  ```nim
187  BlockDelivery.decode(initProtoBuffer(item, maxSize = MaxBlockSize))
188  ```
189  
190  and this is where we get the cid, `Block`, `BlockAddress`, and the corresponding `proof` (for regular data, or *leaf* blocks):
191  
192  ```nim
193  proc decode*(_: type BlockDelivery, pb: ProtoBuffer): ProtoResult[BlockDelivery] =
194    var
195      value = BlockDelivery()
196      dataBuf = newSeq[byte]()
197      cidBuf = newSeq[byte]()
198      cid: Cid
199      ipb: ProtoBuffer
200  
201    if ?pb.getField(1, cidBuf):
202      cid = ?Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob)
203    if ?pb.getField(2, dataBuf):
204      value.blk =
205        ?Block.new(cid, dataBuf, verify = true).mapErr(x => ProtoError.IncorrectBlob)
206    if ?pb.getField(3, ipb):
207      value.address = ?BlockAddress.decode(ipb)
208  
209    if value.address.leaf:
210      var proofBuf = newSeq[byte]()
211      if ?pb.getField(4, proofBuf):
212        let proof = ?CodexProof.decode(proofBuf).mapErr(x => ProtoError.IncorrectBlob)
213        value.proof = proof.some
214      else:
215        value.proof = CodexProof.none
216    else:
217      value.proof = CodexProof.none
218  
219    ok(value)
220  ```
221  
222  We see that we while constructing instance of `Block`, we already request the block validation by setting `verify = true`:
223  
224  ```nim
225  proc new*(
226      T: type Block, cid: Cid, data: openArray[byte], verify: bool = true
227  ): ?!Block =
228    ## creates a new block for both storage and network IO
229    ##
230  
231    without isTorrent =? cid.isTorrentCid, err:
232      return "Unable to determine if cid is torrent info hash".failure
233  
234    # info hash cids are "fake cids" - they will not validate
235    # info hash validation is done outside of the cid itself
236    if verify and not isTorrent:
237      let
238        mhash = ?cid.mhash.mapFailure
239        computedMhash = ?MultiHash.digest($mhash.mcodec, data).mapFailure
240        computedCid = ?Cid.init(cid.cidver, cid.mcodec, computedMhash).mapFailure
241      if computedCid != cid:
242        return "Cid doesn't match the data".failure
243  
244    return Block(cid: cid, data: @data).success
245  ```
246  
247  Here we see that because as explained above, the cids corresponding to the BitTorrent manifest blocks cannot be immediately validated, we make sure, the validation is skipped here for Torrent cids.
248  
249  Once the `Message` is decoded, back in `NetworkPeer.readLoop`, it is passed to `NetworkPeer.handler` which is set to `Network.rpcHandler` while creating the instance of `NetworkPeer` in `Network.getOrCreatePeer`. For block deliveries, `Network.rpcHandler` forwards `msg.payload` (`seq[BlockDelivery]`) to `Network.handleBlocksDelivery`, which in turn, calls `Network.handlers.onBlocksDelivery`. The `Network.handlers.onBlocksDelivery` is set by the constructor of `BlockExcEngine`. Thus, in the end of its journey, a `seq[BlockDelivery]` from the `msg.payload` ends up in `BlockExcEngine.blocksDeliveryHandler`. This is where the data blocks are further validated against the inclusion proof and then the validated data (*leafs*) blocks or non-data blocks (non-*leafs*, e.g. a BitTorrent or Codex Manifest block), are stored in the `localStore` and then *resolved* against pending blocks via `BlockExcEngine.resolveBlocks` that calls `pendingBlocks.resolve(blocksDelivery)` (`PendingBlocksManager`). This is where `blockReq.handle.complete(bd.blk)` is called on the matching pending blocks, which completes future awaited in `BlockExcEngine.requestBlock`, which completes the future awaited in `NetworkStore.getBlock`: `await self.engine.requestBlock(address)`. And `NetworkStore.getBlock` was awaited either in `CodexNodeRef.fetchPieces` for data blocks or in `CodexNodeRef.fetchTorrentManifest`.
250  
251  So, how do we get to `CodexNodeRef.fetchPieces` and `CodexNodeRef.fetchTorrentManifest` in the download flow.
252  
253  It starts with the API handler of `/api/codex/v1/torrent/{infoHash}/network/stream`:
254  
255  ```nim
256  router.api(MethodGet, "/api/codex/v1/torrent/{infoHash}/network/stream") do(
257      infoHash: MultiHash, resp: HttpResponseRef
258    ) -> RestApiResponse:
259      var headers = buildCorsHeaders("GET", allowedOrigin)
260  
261      without infoHash =? infoHash.mapFailure, error:
262        return RestApiResponse.error(Http400, error.msg, headers = headers)
263  
264      if infoHash.mcodec != Sha1HashCodec:
265        return RestApiResponse.error(
266          Http400, "Only torrents version 1 are currently supported!", headers = headers
267        )
268  
269      if corsOrigin =? allowedOrigin:
270        resp.setCorsHeaders("GET", corsOrigin)
271        resp.setHeader("Access-Control-Headers", "X-Requested-With")
272  
273      trace "torrent requested: ", multihash = $infoHash
274  
275      await node.retrieveInfoHash(infoHash, resp = resp)
276  ```
277  
278  `CodexNodeRef.retrieveInfoHash` first tries to fetch the `Torrent` object, which consists of `torrentManifest` and `codexManifest`. To get it, it calls `node.retrieveTorrent(infoHash)` with the `infoHash` as the argument. And then in the `retrieveTorrent` we get to the above mentioned `fetchTorrentManifest`:
279  
280  ```nim
281  proc retrieveTorrent*(
282      self: CodexNodeRef, infoHash: MultiHash
283  ): Future[?!Torrent] {.async.} =
284    without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
285      trace "Unable to create CID for BitTorrent info hash"
286      return failure(error)
287  
288    without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err:
289      trace "Unable to fetch Torrent Manifest"
290      return failure(err)
291  
292    without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)),
293      err:
294      trace "Unable to fetch Codex Manifest for torrent info hash"
295      return failure(err)
296  
297    success (torrentManifest: torrentManifest, codexManifest: codexManifest)
298  ```
299  
300  We first create `infoHashCid`, using only the precomputed `infoHash` and we pass it to `fetchTorrentManifest`:
301  
302  ```nim
303  proc fetchTorrentManifest*(
304      self: CodexNodeRef, infoHashCid: Cid
305  ): Future[?!BitTorrentManifest] {.async.} =
306    if err =? infoHashCid.isTorrentInfoHash.errorOption:
307      return failure "CID has invalid content type for torrent info hash {$cid}"
308  
309    trace "Retrieving torrent manifest for infoHashCid", infoHashCid
310  
311    without blk =? await self.networkStore.getBlock(BlockAddress.init(infoHashCid)), err:
312      trace "Error retrieve manifest block", infoHashCid, err = err.msg
313      return failure err
314  
315    trace "Successfully retrieved torrent manifest with given block cid",
316      cid = blk.cid, infoHashCid
317    trace "Decoding torrent manifest"
318  
319    without torrentManifest =? BitTorrentManifest.decode(blk), err:
320      trace "Unable to decode torrent manifest", err = err.msg
321      return failure("Unable to decode torrent manifest")
322  
323    trace "Decoded torrent manifest", infoHashCid, torrentManifest = $torrentManifest
324  
325    without isValid =? torrentManifest.validate(infoHashCid), err:
326      trace "Error validating torrent manifest", infoHashCid, err = err.msg
327      return failure(err.msg)
328  
329    if not isValid:
330      trace "Torrent manifest does not match torrent info hash", infoHashCid
331      return failure "Torrent manifest does not match torrent info hash {$infoHashCid}"
332  
333    return torrentManifest.success
334  ```
335  
336  Here we will be awaiting  for the `networkStore.getBlock`, which will get completed with the block delivery flow we describe at the beginning of this section. We restore the `BitTorrentManifest` object using `BitTorrentManifest.decode(blk)`, and then we validate if the `info` dictionary from the received BitTorrent manifest matches the request `infoHash`:
337  
338  ```nim
339  without isValid =? torrentManifest.validate(infoHashCid), err:
340    trace "Error validating torrent manifest", infoHashCid, err = err.msg
341    return failure(err.msg)
342  ```
343  
344  With this we know that we have a genuine `info` dictionary.
345  
346  Now, we still need to get and validate the actual data.
347  
348  BitTorrent manifest includes the cid of the Codex manifest in `codexManifestCid` attribute. Back in `retrieveTorrent`, we now fetch the Codex manifest, and we return both to `retrieveInfoHash`, where the download effectively started.
349  
350  We are ready to start the streaming operation. The illustration below, shows a high level conceptual overview of the streaming process:
351  
352  ![[Codex BitTorrent Streaming.svg]]
353  
354  From the diagram above we see, that there are two concurrent tasks: a *prefetch* tasks fetching the blocks from the network store, aggregating, and validating pieces, and a *streaming* tasks, sending the blocks down to the client via REST API.
355  
356  The prefetch task is started by `retrieveInfoHash` calling `streamTorrent`, passing both manifests and piece validator as arguments:
357  
358  ```nim
359  let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest)
360  
361  let stream =
362    await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator)
363  ```
364  
365  Let's take a look at `streamTorrent`:
366  
367  ```nim
368  proc streamTorrent*(
369      self: CodexNodeRef,
370      torrentManifest: BitTorrentManifest,
371      codexManifest: Manifest,
372      pieceValidator: TorrentPieceValidator,
373  ): Future[LPStream] {.async: (raises: []).} =
374    trace "Retrieving pieces from torrent"
375    let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
376  
377    proc prefetch(): Future[void] {.async: (raises: []).} =
378      try:
379        if err =? (await self.fetchPieces(torrentManifest, codexManifest, pieceValidator)).errorOption:
380          error "Unable to fetch blocks", err = err.msg
381          await noCancel pieceValidator.cancel()
382          await noCancel stream.close()
383      except CancelledError:
384        trace "Prefetch cancelled"
385  
386    let prefetchTask = prefetch()
387  
388    # Monitor stream completion and cancel background jobs when done
389    proc monitorStream() {.async: (raises: []).} =
390      try:
391        await stream.join()
392      except CancelledError:
393        trace "Stream cancelled"
394      finally:
395        await noCancel prefetchTask.cancelAndWait
396  
397    self.trackedFutures.track(monitorStream())
398  
399    trace "Creating store stream for torrent manifest"
400    stream
401  ```
402  
403  `streamTorrent` does two things:
404  
405  1. starts background `prefetch` task
406  2. monitors the stream using `monitorStream`
407  
408  The `prefetch` job calls `fetchPieces`:
409  
410  ```nim
411  proc fetchPieces*(
412      self: CodexNodeRef,
413      torrentManifest: BitTorrentManifest,
414      codexManifest: Manifest,
415      pieceValidator: TorrentPieceValidator,
416  ): Future[?!void] {.async: (raises: [CancelledError]).} =
417    let cid = codexManifest.treeCid
418    let numOfBlocksPerPiece = pieceValidator.numberOfBlocksPerPiece
419    let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount)
420    while not blockIter.finished:
421      let blockFutures = collect:
422        for i in 0 ..< numOfBlocksPerPiece:
423          if not blockIter.finished:
424            let address = BlockAddress.init(cid, blockIter.next())
425            self.networkStore.getBlock(address)
426  
427      without blocks =? await allFinishedValues(blockFutures), err:
428        return failure(err)
429  
430      if err =? self.validatePiece(pieceValidator, blocks).errorOption:
431        return failure(err)
432  
433      await sleepAsync(1.millis)
434  
435    success()
436  ```
437  
438  We fetch blocks in *batches*, or rather *in pieces*. We trigger fetching blocks with `self.networkStore.getBlock(address)`, which will resolve by either getting the block from the local store or from the network through block delivery described earlier.
439  
440  Notice that we need to get all the relevant blocks here, not only the blocks that are not yet in the local store. This is necessary, because we need to get all the blocks in a piece so that we can validate the piece and potentially stop streaming if the piece turns out to be invalid.
441  
442  Before calling `validatePiece`, where validation takes place, we wait for all `Futures` to complete returning the requested `blocks`.
443  
444  `validatePiece` is defined as follows:
445  
446  ```nim
447  proc validatePiece(
448      self: CodexNodeRef, pieceValidator: TorrentPieceValidator, blocks: seq[bt.Block]
449  ): ?!void {.raises: [].} =
450    trace "Fetched complete torrent piece - verifying..."
451    let pieceIndex = pieceValidator.validatePiece(blocks)
452  
453    if pieceIndex < 0:
454      error "Piece verification failed", pieceIndex = pieceIndex
455      return failure(fmt"Piece verification failed for {pieceIndex=}")
456  
457    trace "Piece successfully verified", pieceIndex
458  
459    let confirmedPieceIndex = pieceValidator.confirmCurrentPiece()
460  
461    if pieceIndex != confirmedPieceIndex:
462      error "Piece confirmation failed",
463        pieceIndex = pieceIndex, confirmedPieceIndex = confirmedPieceIndex
464      return
465        failure(fmt"Piece confirmation failed for {pieceIndex=}, {confirmedPieceIndex=}")
466    success()
467  ```
468  
469  It first calls `validatePiece` on the `pieceValidator`, which computes the SHA1 hash of the concatenated blocks and checks if it matches the (multi) hash from the `info` dictionary.
470  
471  > [!info]
472  This constitutes the second validation step: after we checked that the `info` dictionary matches the requested `info` hash in the first step described above, here we are making sure that the received content matches the metadata in the `info` dictionary, and thus it is indeed the content identified by the `info` hash from the request.
473  
474  `PiecePalidator` maintains internal state so that it known which piece is expected at the given moment - this is why it does not need the piece index argument to validate the blocks. Upon successful validation it returns the index of the validated piece. We then call `pieceValidator.confirmCurrentPiece` to *notify* REST API streaming that is awaiting on `torrentPieceValidator.waitForNextPiece()` before streaming the validated blocks to the requesting client:
475  
476  ```nim
477  proc retrieveInfoHash(
478      node: CodexNodeRef, infoHash: MultiHash, resp: HttpResponseRef
479  ): Future[void] {.async.} =
480    ## Download torrent from the node in a streaming
481    ## manner
482    ##
483    var stream: LPStream
484  
485    var bytes = 0
486    try:
487      without torrent =? (await node.retrieveTorrent(infoHash)), err:
488        error "Unable to fetch Torrent Metadata", err = err.msg
489        resp.status = Http404
490        await resp.sendBody(err.msg)
491        return
492      let (torrentManifest, codexManifest) = torrent
493  
494      if codexManifest.mimetype.isSome:
495        resp.setHeader("Content-Type", codexManifest.mimetype.get())
496      else:
497        resp.addHeader("Content-Type", "application/octet-stream")
498  
499      if codexManifest.filename.isSome:
500        resp.setHeader(
501          "Content-Disposition",
502          "attachment; filename=\"" & codexManifest.filename.get() & "\"",
503        )
504      else:
505        resp.setHeader("Content-Disposition", "attachment")
506  
507      await resp.prepareChunked()
508  
509      let torrentPieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest)
510  
511      let stream =
512        await node.streamTorrent(torrentManifest, codexManifest, torrentPieceValidator)
513  
514      while not stream.atEof:
515        trace "Waiting for piece..."
516        let pieceIndex = await torrentPieceValidator.waitForNextPiece()
517  
518        if -1 == pieceIndex:
519          warn "No more torrent pieces expected. TorrentPieceValidator might be out of sync!"
520          break
521  
522        trace "Got piece", pieceIndex
523  
524        let blocksPerPieceIter = torrentPieceValidator.getNewBlocksPerPieceIterator()
525        while not blocksPerPieceIter.finished and not stream.atEof:
526          var buff = newSeqUninitialized[byte](BitTorrentBlockSize.int)
527          # wait for the next the piece to prefetch
528          let len = await stream.readOnce(addr buff[0], buff.len)
529  
530          buff.setLen(len)
531          if buff.len <= 0:
532            break
533  
534          bytes += buff.len
535  
536          await resp.sendChunk(addr buff[0], buff.len)
537          discard blocksPerPieceIter.next()
538      await resp.finish()
539      codex_api_downloads.inc()
540    except CancelledError as exc:
541      info "Stream cancelled", exc = exc.msg
542      raise exc
543    except CatchableError as exc:
544      warn "Error streaming blocks", exc = exc.msg
545      resp.status = Http500
546      if resp.isPending():
547        await resp.sendBody(exc.msg)
548    finally:
549      info "Sent bytes for torrent", infoHash = $infoHash, bytes
550      if not stream.isNil:
551        await stream.close()
552  ```
553  
554  Now, two important points. First, when the streaming happens to be interrupted the stream will be closed in the `finally` block. This in turns will be detected by the `monitorStream` in `streamTorrent` causing the `prefetch` job to be cancelled. Second, when either piece validation fails, or if any of the `getBlock` future awaiting completion fails, `prefetch` will return error, which will cause the stream to be closed:
555  
556  ```nim
557  proc prefetch(): Future[void] {.async: (raises: []).} =
558    try:
559      if err =? (
560  	  await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
561      ).errorOption:
562  	  error "Unable to fetch blocks", err = err.msg
563  	  await noCancel pieceValidator.cancel()
564        await noCancel stream.close()
565     except CancelledError:
566     trace "Prefetch cancelled"
567  ```
568  
569  Without this detection mechanism, we would either continue fetching blocks even when streaming API request has been interrupted, or we would continue streaming, even when it is already known that the piece validation phase has failed. This would result in potentially invalid content being returned to the client. After any failure in the `prefetch` job, the pieces will no longer be validated, and thus it does not make any sense to continue the streaming operation.
570  
571  The `stream.readOnce` called in the streaming loop and implemented in `StoreStream`, which uses the same underlying `networkStore` that is also used in `fetchPieces` proc shown above, will be calling that same `getBlock` operation, which in case the block is not already in local store (because it was already there or as a result of the prefetch operation), will request it from the block exchange engine via `BlockExcEngine.requestBlock` operation. In case there is already a pending request for the given block address, the `PendingBlocksManager` will return the existing block handle, so that  `BlockExcEngine.requestBlock` operation will not cause duplicate request. It will, however potentially return an invalid block to the client, before the containing piece has been validated in the prefetch phase. 
572  
573  If you want to experiment with uploading and downloading BitTorrent content yourself, check [[Upload and download BitTorrent content with Codex - demo]].