/ 10 Notes / Codex WantList.md
Codex WantList.md
  1  ---
  2  tags:
  3    - codex/want-list
  4    - codex/block-exchange
  5  related:
  6    - "[[Codex Block Exchange Protocol]]"
  7    - "[[Uploading and downloading content in Codex]]"
  8  ---
  9  #codex/want-list #codex/block-exchange 
 10  
 11  | related | [[Codex Block Exchange Protocol]], [[Uploading and downloading content in Codex]] |
 12  | ------- | --------------------------------------------------------------------------------- |
 13  
 14  When engine is being created, it subscribes to the `PeerEventKind.Joined` and `PeerEventKind.Left`:
 15  
 16  ```nim
 17  network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
 18  network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
 19  ```
 20  
 21  `PeerEventKind.Joined` is triggered when peers connects to us, and `PeerEventKind.Left` when peer disconnects from us.
 22  
 23  When peer is joining, we call `setupPeer`
 24  
 25  ```nim
 26  proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
 27    ## Perform initial setup, such as want
 28    ## list exchange
 29    ##
 30  
 31    trace "Setting up peer", peer
 32  
 33    if peer notin b.peers:
 34      trace "Setting up new peer", peer
 35      b.peers.add(BlockExcPeerCtx(id: peer))
 36      trace "Added peer", peers = b.peers.len
 37  
 38    # broadcast our want list, the other peer will do the same
 39    if b.pendingBlocks.wantListLen > 0:
 40      trace "Sending our want list to a peer", peer
 41      let cids = toSeq(b.pendingBlocks.wantList)
 42      await b.network.request.sendWantList(peer, cids, full = true)
 43  
 44    if address =? b.pricing .? address:
 45      await b.network.request.sendAccount(peer, Account(address: address))
 46  ```
 47  
 48  Here is where we send the joining peer our `WantList`.
 49  
 50  We get it from `PendingBlocksManager`. `PendingBlocksManager` has a list of *pending* blocks. Every time engine requests a block via `requestBlock(address)`, the block corresponding to the `address` provided becomes pending. It is done via call:
 51  
 52  ```nim
 53  b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
 54  ```
 55  
 56  `getWantHandle` will put the request address on its `blocks` list, which is a mapping from `BlockAddress` to `BlockReq`:
 57  
 58  ```nim
 59  p.blocks[address] = BlockReq(
 60    handle: newFuture[Block]("pendingBlocks.getWantHandle"),
 61    inFlight: inFlight,
 62    startTime: getMonoTime().ticks,
 63  )
 64  ```
 65  
 66  At any given time, the pending blocks form our `WantList` and it will be sent to the joining peer:
 67  
 68  ```nim
 69  await b.network.request.sendWantList(peer, cids, full = true)
 70  ```
 71  
 72  where `request.sendWantList` is set to:
 73  
 74  ```nim
 75  proc sendWantList(
 76    id: PeerId,
 77    cids: seq[BlockAddress],
 78    priority: int32 = 0,
 79    cancel: bool = false,
 80    wantType: WantType = WantType.WantHave,
 81    full: bool = false,
 82    sendDontHave: bool = false,
 83  ): Future[void] {.gcsafe.} =
 84    self.sendWantList(id, cids, priority, cancel, wantType, full, sendDontHave)
 85  ```
 86  
 87  in `BlockExcNetwork.new`.
 88  
 89  We see that `wantType` argument takes the default value `WantType.WantHave`. The `full` argument is set to `true` in this case, which means this is our full `WantList`.
 90  
 91  Thus, intuitively, if a `cid` (`BlockAddress` to be precise) is on the `WantList` with `WantType.WantHave` it means that the corresponding node *wants to have* that cid.
 92  
 93  Let's look closer at the `BlockExcEngine.requestBlock` proc:
 94  
 95  ```nim
 96  proc requestBlock*(
 97      b: BlockExcEngine, address: BlockAddress
 98  ): Future[?!Block] {.async.} =
 99    let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout)
100  
101    if not b.pendingBlocks.isInFlight(address):
102      let peers = b.peers.getPeersForBlock(address)
103  
104      if peers.with.len == 0:
105        b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
106      else:
107        let selected = pickPseudoRandom(address, peers.with)
108        asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
109        b.pendingBlocks.setInFlight(address)
110        await b.sendWantBlock(@[address], selected)
111  
112      await b.sendWantHave(@[address], peers.without)
113  
114    # Don't let timeouts bubble up. We can't be too broad here or we break
115    # cancellations.
116    try:
117      success await blockFuture
118    except AsyncTimeoutError as err:
119      failure err
120  ```
121  
122  > [!warning]
123  > `requestBlock` as we see above is undergoing some important changes and for a good reason. First it will be called `downloadInternal` and the `getWantHandle` will no longer be awaiting on returned handle (thus ultimately it will be doing what it days it does). Other important change to notice is that `sendWantHave` will be called only if there are no peers with the requested address; in the version above we see that `WantHave` is sent even if we have a peer with the request address to which we have just sent `WantBlock`.
124  
125  When a node *requests* a block, we first check if the given pending block has the `inFlight` attribute set, indicating that the block has been recently requested from a remote node known to have it. If it is not the case, we first gather all the peers that have given `cid` and the complementary list of peers that do not have the given `cid`. If no peer in the swarm is having that `cid`, we will trigger discovery. Otherwise, we (pseudo) randomly choose one peer known to have the given `cid` and send it the `WantBlock` request. Subsequently, we then send the `WantHave` request to all the peers known not to have that `cid` (so that they know we are interested in it and let us know that have it once it is the case).
126  
127  Now, let's look what happens when a peer receives the `WantList`. This is handled by `BlockExcEngine.wantListHandler`:
128  
129  ```nim
130  proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} =
131    let peerCtx = b.peers.get(peer)
132  
133    if peerCtx.isNil:
134      return
135  
136    var
137      presence: seq[BlockPresence]
138      schedulePeer = false
139  
140    for e in wantList.entries:
141      let idx = peerCtx.peerWants.findIt(it.address == e.address)
142  
143      logScope:
144        peer = peerCtx.id
145        address = e.address
146        wantType = $e.wantType
147  
148      if idx < 0: # Adding new entry to peer wants
149        let
150          have = await e.address in b.localStore
151          price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
152  
153        case e.wantType
154        of WantType.WantHave:
155          if have:
156            presence.add(
157              BlockPresence(
158                address: e.address, `type`: BlockPresenceType.Have, price: price
159              )
160            )
161          else:
162            if e.sendDontHave:
163              presence.add(
164                BlockPresence(
165                  address: e.address, `type`: BlockPresenceType.DontHave, price: price
166                )
167              )
168            peerCtx.peerWants.add(e)
169  
170          codex_block_exchange_want_have_lists_received.inc()
171        of WantType.WantBlock:
172          peerCtx.peerWants.add(e)
173          schedulePeer = true
174          codex_block_exchange_want_block_lists_received.inc()
175      else: # Updating existing entry in peer wants
176        # peer doesn't want this block anymore
177        if e.cancel:
178          trace "Canceling want for block", address = e.address
179          peerCtx.peerWants.del(idx)
180        else:
181          # peer might want to ask for the same cid with
182          # different want params
183          trace "Updating want for block", address = e.address
184          peerCtx.peerWants[idx] = e # update entry
185  
186    if presence.len > 0:
187      trace "Sending presence to remote", items = presence.mapIt($it).join(",")
188      await b.network.request.sendPresence(peer, presence)
189  
190    if schedulePeer:
191      if not b.scheduleTask(peerCtx):
192        warn "Unable to schedule task for peer", peer
193  ```
194  
195  We go though the `WantList` entries, one-by-one.
196  
197  1. We check if the `WantList` item is already on the locally kept `WantList` associated with that peer (`peerCtx.peerWants`).
198  2. If it is not the case, we add new entry to the peer's `WantList`:
199  	1. We first check if we already have the block corresponding to the `WantList` item in our `localStore`.
200  	2. If we do, and the `WantList` item is `WantHave`, we add an entry to the `presence` list, otherwise (i.e. when `WantList` item is `WantHave` but we do not have the corresponding block in `localStore`) we add the entry to `peerCtx.peerWants`. If `WantList` item is `WantBlock` we add the corresponding entry to `peerCtx.peerWants` and set a flag to schedule a task where we will eventually send the requested block to the remote peer (we do that even regardless of if we have a block or not in `localStore`).
201  3. If the `WantList` item is already on the locally kept `WantList` associated with that peer, we just update the entry.