Codex WantList.md
1 --- 2 tags: 3 - codex/want-list 4 - codex/block-exchange 5 related: 6 - "[[Codex Block Exchange Protocol]]" 7 - "[[Uploading and downloading content in Codex]]" 8 --- 9 #codex/want-list #codex/block-exchange 10 11 | related | [[Codex Block Exchange Protocol]], [[Uploading and downloading content in Codex]] | 12 | ------- | --------------------------------------------------------------------------------- | 13 14 When engine is being created, it subscribes to the `PeerEventKind.Joined` and `PeerEventKind.Left`: 15 16 ```nim 17 network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) 18 network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) 19 ``` 20 21 `PeerEventKind.Joined` is triggered when peers connects to us, and `PeerEventKind.Left` when peer disconnects from us. 22 23 When peer is joining, we call `setupPeer` 24 25 ```nim 26 proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = 27 ## Perform initial setup, such as want 28 ## list exchange 29 ## 30 31 trace "Setting up peer", peer 32 33 if peer notin b.peers: 34 trace "Setting up new peer", peer 35 b.peers.add(BlockExcPeerCtx(id: peer)) 36 trace "Added peer", peers = b.peers.len 37 38 # broadcast our want list, the other peer will do the same 39 if b.pendingBlocks.wantListLen > 0: 40 trace "Sending our want list to a peer", peer 41 let cids = toSeq(b.pendingBlocks.wantList) 42 await b.network.request.sendWantList(peer, cids, full = true) 43 44 if address =? b.pricing .? address: 45 await b.network.request.sendAccount(peer, Account(address: address)) 46 ``` 47 48 Here is where we send the joining peer our `WantList`. 49 50 We get it from `PendingBlocksManager`. `PendingBlocksManager` has a list of *pending* blocks. Every time engine requests a block via `requestBlock(address)`, the block corresponding to the `address` provided becomes pending. It is done via call: 51 52 ```nim 53 b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) 54 ``` 55 56 `getWantHandle` will put the request address on its `blocks` list, which is a mapping from `BlockAddress` to `BlockReq`: 57 58 ```nim 59 p.blocks[address] = BlockReq( 60 handle: newFuture[Block]("pendingBlocks.getWantHandle"), 61 inFlight: inFlight, 62 startTime: getMonoTime().ticks, 63 ) 64 ``` 65 66 At any given time, the pending blocks form our `WantList` and it will be sent to the joining peer: 67 68 ```nim 69 await b.network.request.sendWantList(peer, cids, full = true) 70 ``` 71 72 where `request.sendWantList` is set to: 73 74 ```nim 75 proc sendWantList( 76 id: PeerId, 77 cids: seq[BlockAddress], 78 priority: int32 = 0, 79 cancel: bool = false, 80 wantType: WantType = WantType.WantHave, 81 full: bool = false, 82 sendDontHave: bool = false, 83 ): Future[void] {.gcsafe.} = 84 self.sendWantList(id, cids, priority, cancel, wantType, full, sendDontHave) 85 ``` 86 87 in `BlockExcNetwork.new`. 88 89 We see that `wantType` argument takes the default value `WantType.WantHave`. The `full` argument is set to `true` in this case, which means this is our full `WantList`. 90 91 Thus, intuitively, if a `cid` (`BlockAddress` to be precise) is on the `WantList` with `WantType.WantHave` it means that the corresponding node *wants to have* that cid. 92 93 Let's look closer at the `BlockExcEngine.requestBlock` proc: 94 95 ```nim 96 proc requestBlock*( 97 b: BlockExcEngine, address: BlockAddress 98 ): Future[?!Block] {.async.} = 99 let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) 100 101 if not b.pendingBlocks.isInFlight(address): 102 let peers = b.peers.getPeersForBlock(address) 103 104 if peers.with.len == 0: 105 b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) 106 else: 107 let selected = pickPseudoRandom(address, peers.with) 108 asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) 109 b.pendingBlocks.setInFlight(address) 110 await b.sendWantBlock(@[address], selected) 111 112 await b.sendWantHave(@[address], peers.without) 113 114 # Don't let timeouts bubble up. We can't be too broad here or we break 115 # cancellations. 116 try: 117 success await blockFuture 118 except AsyncTimeoutError as err: 119 failure err 120 ``` 121 122 > [!warning] 123 > `requestBlock` as we see above is undergoing some important changes and for a good reason. First it will be called `downloadInternal` and the `getWantHandle` will no longer be awaiting on returned handle (thus ultimately it will be doing what it days it does). Other important change to notice is that `sendWantHave` will be called only if there are no peers with the requested address; in the version above we see that `WantHave` is sent even if we have a peer with the request address to which we have just sent `WantBlock`. 124 125 When a node *requests* a block, we first check if the given pending block has the `inFlight` attribute set, indicating that the block has been recently requested from a remote node known to have it. If it is not the case, we first gather all the peers that have given `cid` and the complementary list of peers that do not have the given `cid`. If no peer in the swarm is having that `cid`, we will trigger discovery. Otherwise, we (pseudo) randomly choose one peer known to have the given `cid` and send it the `WantBlock` request. Subsequently, we then send the `WantHave` request to all the peers known not to have that `cid` (so that they know we are interested in it and let us know that have it once it is the case). 126 127 Now, let's look what happens when a peer receives the `WantList`. This is handled by `BlockExcEngine.wantListHandler`: 128 129 ```nim 130 proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} = 131 let peerCtx = b.peers.get(peer) 132 133 if peerCtx.isNil: 134 return 135 136 var 137 presence: seq[BlockPresence] 138 schedulePeer = false 139 140 for e in wantList.entries: 141 let idx = peerCtx.peerWants.findIt(it.address == e.address) 142 143 logScope: 144 peer = peerCtx.id 145 address = e.address 146 wantType = $e.wantType 147 148 if idx < 0: # Adding new entry to peer wants 149 let 150 have = await e.address in b.localStore 151 price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) 152 153 case e.wantType 154 of WantType.WantHave: 155 if have: 156 presence.add( 157 BlockPresence( 158 address: e.address, `type`: BlockPresenceType.Have, price: price 159 ) 160 ) 161 else: 162 if e.sendDontHave: 163 presence.add( 164 BlockPresence( 165 address: e.address, `type`: BlockPresenceType.DontHave, price: price 166 ) 167 ) 168 peerCtx.peerWants.add(e) 169 170 codex_block_exchange_want_have_lists_received.inc() 171 of WantType.WantBlock: 172 peerCtx.peerWants.add(e) 173 schedulePeer = true 174 codex_block_exchange_want_block_lists_received.inc() 175 else: # Updating existing entry in peer wants 176 # peer doesn't want this block anymore 177 if e.cancel: 178 trace "Canceling want for block", address = e.address 179 peerCtx.peerWants.del(idx) 180 else: 181 # peer might want to ask for the same cid with 182 # different want params 183 trace "Updating want for block", address = e.address 184 peerCtx.peerWants[idx] = e # update entry 185 186 if presence.len > 0: 187 trace "Sending presence to remote", items = presence.mapIt($it).join(",") 188 await b.network.request.sendPresence(peer, presence) 189 190 if schedulePeer: 191 if not b.scheduleTask(peerCtx): 192 warn "Unable to schedule task for peer", peer 193 ``` 194 195 We go though the `WantList` entries, one-by-one. 196 197 1. We check if the `WantList` item is already on the locally kept `WantList` associated with that peer (`peerCtx.peerWants`). 198 2. If it is not the case, we add new entry to the peer's `WantList`: 199 1. We first check if we already have the block corresponding to the `WantList` item in our `localStore`. 200 2. If we do, and the `WantList` item is `WantHave`, we add an entry to the `presence` list, otherwise (i.e. when `WantList` item is `WantHave` but we do not have the corresponding block in `localStore`) we add the entry to `peerCtx.peerWants`. If `WantList` item is `WantBlock` we add the corresponding entry to `peerCtx.peerWants` and set a flag to schedule a task where we will eventually send the requested block to the remote peer (we do that even regardless of if we have a block or not in `localStore`). 201 3. If the `WantList` item is already on the locally kept `WantList` associated with that peer, we just update the entry.