bitswap.go
1 package node 2 3 import ( 4 "context" 5 "errors" 6 "io" 7 "time" 8 9 "github.com/dustin/go-humanize" 10 "github.com/ipfs/boxo/bitswap" 11 "github.com/ipfs/boxo/bitswap/client" 12 "github.com/ipfs/boxo/bitswap/network" 13 bsnet "github.com/ipfs/boxo/bitswap/network/bsnet" 14 "github.com/ipfs/boxo/bitswap/network/httpnet" 15 blockstore "github.com/ipfs/boxo/blockstore" 16 exchange "github.com/ipfs/boxo/exchange" 17 rpqm "github.com/ipfs/boxo/routing/providerquerymanager" 18 "github.com/ipfs/go-cid" 19 ipld "github.com/ipfs/go-ipld-format" 20 version "github.com/ipfs/kubo" 21 "github.com/ipfs/kubo/config" 22 "github.com/libp2p/go-libp2p/core/host" 23 peer "github.com/libp2p/go-libp2p/core/peer" 24 "github.com/libp2p/go-libp2p/core/routing" 25 "go.uber.org/fx" 26 27 blocks "github.com/ipfs/go-block-format" 28 "github.com/ipfs/kubo/core/node/helpers" 29 ) 30 31 // Docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#internalbitswap 32 const ( 33 DefaultEngineBlockstoreWorkerCount = 128 34 DefaultTaskWorkerCount = 8 35 DefaultEngineTaskWorkerCount = 8 36 DefaultMaxOutstandingBytesPerPeer = 1 << 20 37 DefaultProviderSearchDelay = 1000 * time.Millisecond 38 DefaultMaxProviders = 10 // matching BitswapClientDefaultMaxProviders from https://github.com/ipfs/boxo/blob/v0.29.1/bitswap/internal/defaults/defaults.go#L15 39 DefaultWantHaveReplaceSize = 1024 40 ) 41 42 type bitswapOptionsOut struct { 43 fx.Out 44 45 BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"` 46 } 47 48 // BitswapOptions creates configuration options for Bitswap from the config file 49 // and whether to provide data. 50 func BitswapOptions(cfg *config.Config) any { 51 return func() bitswapOptionsOut { 52 var internalBsCfg config.InternalBitswap 53 if cfg.Internal.Bitswap != nil { 54 internalBsCfg = *cfg.Internal.Bitswap 55 } 56 57 opts := []bitswap.Option{ 58 bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale 59 bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))), 60 bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))), 61 bitswap.EngineTaskWorkerCount(int(internalBsCfg.EngineTaskWorkerCount.WithDefault(DefaultEngineTaskWorkerCount))), 62 bitswap.MaxOutstandingBytesPerPeer(int(internalBsCfg.MaxOutstandingBytesPerPeer.WithDefault(DefaultMaxOutstandingBytesPerPeer))), 63 bitswap.WithWantHaveReplaceSize(int(internalBsCfg.WantHaveReplaceSize.WithDefault(DefaultWantHaveReplaceSize))), 64 } 65 66 return bitswapOptionsOut{BitswapOpts: opts} 67 } 68 } 69 70 type bitswapIn struct { 71 fx.In 72 73 Mctx helpers.MetricsCtx 74 Cfg *config.Config 75 Host host.Host 76 Discovery routing.ContentDiscovery 77 Bs blockstore.GCBlockstore 78 BitswapOpts []bitswap.Option `group:"bitswap-options"` 79 } 80 81 // Bitswap creates the BitSwap server/client instance. 82 // If Bitswap.ServerEnabled is false, the node will act only as a client 83 // using an empty blockstore to prevent serving blocks to other peers. 84 func Bitswap(serverEnabled, libp2pEnabled, httpEnabled bool) any { 85 return func(in bitswapIn, lc fx.Lifecycle) (*bitswap.Bitswap, error) { 86 var bitswapNetworks, bitswapLibp2p network.BitSwapNetwork 87 var bitswapBlockstore blockstore.Blockstore = in.Bs 88 89 connEvtMgr := network.NewConnectEventManager() 90 91 libp2pEnabled := in.Cfg.Bitswap.Libp2pEnabled.WithDefault(config.DefaultBitswapLibp2pEnabled) 92 if libp2pEnabled { 93 bitswapLibp2p = bsnet.NewFromIpfsHost( 94 in.Host, 95 bsnet.WithConnectEventManager(connEvtMgr), 96 ) 97 } 98 99 if httpEnabled { 100 httpCfg := in.Cfg.HTTPRetrieval 101 maxBlockSize, err := humanize.ParseBytes(httpCfg.MaxBlockSize.WithDefault(config.DefaultHTTPRetrievalMaxBlockSize)) 102 if err != nil { 103 return nil, err 104 } 105 logger.Infof("HTTP Retrieval enabled: Allowlist: %t. Denylist: %t", 106 httpCfg.Allowlist != nil, 107 httpCfg.Denylist != nil, 108 ) 109 110 bitswapHTTP := httpnet.New(in.Host, 111 httpnet.WithHTTPWorkers(int(httpCfg.NumWorkers.WithDefault(config.DefaultHTTPRetrievalNumWorkers))), 112 httpnet.WithAllowlist(httpCfg.Allowlist), 113 httpnet.WithDenylist(httpCfg.Denylist), 114 httpnet.WithInsecureSkipVerify(httpCfg.TLSInsecureSkipVerify.WithDefault(config.DefaultHTTPRetrievalTLSInsecureSkipVerify)), 115 httpnet.WithMaxBlockSize(int64(maxBlockSize)), 116 httpnet.WithUserAgent(version.GetUserAgentVersion()), 117 httpnet.WithMetricsLabelsForEndpoints(httpCfg.Allowlist), 118 httpnet.WithConnectEventManager(connEvtMgr), 119 ) 120 bitswapNetworks = network.New(in.Host.Peerstore(), bitswapLibp2p, bitswapHTTP) 121 } else if libp2pEnabled { 122 bitswapNetworks = bitswapLibp2p 123 } else { 124 return nil, errors.New("invalid configuration: Bitswap.Libp2pEnabled and HTTPRetrieval.Enabled are both disabled, unable to initialize Bitswap") 125 } 126 127 // Kubo uses own, customized ProviderQueryManager 128 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false))) 129 var maxProviders int = DefaultMaxProviders 130 131 var bcDisposition string 132 if in.Cfg.Internal.Bitswap != nil { 133 maxProviders = int(in.Cfg.Internal.Bitswap.ProviderSearchMaxResults.WithDefault(DefaultMaxProviders)) 134 if in.Cfg.Internal.Bitswap.BroadcastControl != nil { 135 bcCfg := in.Cfg.Internal.Bitswap.BroadcastControl 136 bcEnable := bcCfg.Enable.WithDefault(config.DefaultBroadcastControlEnable) 137 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlEnable(bcEnable))) 138 if bcEnable { 139 bcDisposition = "enabled" 140 bcMaxPeers := int(bcCfg.MaxPeers.WithDefault(config.DefaultBroadcastControlMaxPeers)) 141 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxPeers(bcMaxPeers))) 142 143 bcLocalPeers := bcCfg.LocalPeers.WithDefault(config.DefaultBroadcastControlLocalPeers) 144 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlLocalPeers(bcLocalPeers))) 145 146 bcPeeredPeers := bcCfg.PeeredPeers.WithDefault(config.DefaultBroadcastControlPeeredPeers) 147 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlPeeredPeers(bcPeeredPeers))) 148 149 bcMaxRandomPeers := int(bcCfg.MaxRandomPeers.WithDefault(config.DefaultBroadcastControlMaxRandomPeers)) 150 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxRandomPeers(bcMaxRandomPeers))) 151 152 bcSendToPendingPeers := bcCfg.SendToPendingPeers.WithDefault(config.DefaultBroadcastControlSendToPendingPeers) 153 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlSendToPendingPeers(bcSendToPendingPeers))) 154 } else { 155 bcDisposition = "disabled" 156 } 157 } 158 } 159 160 // If broadcast control is not configured, then configure with defaults. 161 if bcDisposition == "" { 162 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlEnable(config.DefaultBroadcastControlEnable))) 163 if config.DefaultBroadcastControlEnable { 164 bcDisposition = "enabled" 165 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxPeers(config.DefaultBroadcastControlMaxPeers))) 166 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlLocalPeers(config.DefaultBroadcastControlLocalPeers))) 167 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlPeeredPeers(config.DefaultBroadcastControlPeeredPeers))) 168 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxRandomPeers(config.DefaultBroadcastControlMaxRandomPeers))) 169 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlSendToPendingPeers(config.DefaultBroadcastControlSendToPendingPeers))) 170 } else { 171 bcDisposition = "enabled" 172 } 173 } 174 logger.Infof("bitswap client broadcast control %s", bcDisposition) 175 176 ignoredPeerIDs := make([]peer.ID, 0, len(in.Cfg.Routing.IgnoreProviders)) 177 for _, str := range in.Cfg.Routing.IgnoreProviders { 178 pid, err := peer.Decode(str) 179 if err != nil { 180 return nil, err 181 } 182 ignoredPeerIDs = append(ignoredPeerIDs, pid) 183 } 184 providerQueryMgr, err := rpqm.New(bitswapNetworks, 185 in.Discovery, 186 rpqm.WithMaxProviders(maxProviders), 187 rpqm.WithIgnoreProviders(ignoredPeerIDs...), 188 ) 189 if err != nil { 190 return nil, err 191 } 192 193 // Explicitly enable/disable server 194 in.BitswapOpts = append(in.BitswapOpts, bitswap.WithServerEnabled(serverEnabled)) 195 196 bs := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetworks, providerQueryMgr, bitswapBlockstore, in.BitswapOpts...) 197 198 lc.Append(fx.Hook{ 199 OnStop: func(ctx context.Context) error { 200 return bs.Close() 201 }, 202 }) 203 return bs, nil 204 } 205 } 206 207 // OnlineExchange creates new LibP2P backed block exchange. 208 // Returns a no-op exchange if Bitswap is disabled. 209 func OnlineExchange(isBitswapActive bool) any { 210 return func(in *bitswap.Bitswap, lc fx.Lifecycle) exchange.Interface { 211 if !isBitswapActive { 212 return &noopExchange{closer: in} 213 } 214 lc.Append(fx.Hook{ 215 OnStop: func(ctx context.Context) error { 216 return in.Close() 217 }, 218 }) 219 return in 220 } 221 } 222 223 type noopExchange struct { 224 closer io.Closer 225 } 226 227 func (e *noopExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { 228 return nil, ipld.ErrNotFound{Cid: c} 229 } 230 231 func (e *noopExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { 232 ch := make(chan blocks.Block) 233 close(ch) 234 return ch, nil 235 } 236 237 func (e *noopExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { 238 return nil 239 } 240 241 func (e *noopExchange) Close() error { 242 return e.closer.Close() 243 }