/ core / node / bitswap.go
bitswap.go
  1  package node
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"io"
  7  	"time"
  8  
  9  	"github.com/dustin/go-humanize"
 10  	"github.com/ipfs/boxo/bitswap"
 11  	"github.com/ipfs/boxo/bitswap/client"
 12  	"github.com/ipfs/boxo/bitswap/network"
 13  	bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
 14  	"github.com/ipfs/boxo/bitswap/network/httpnet"
 15  	blockstore "github.com/ipfs/boxo/blockstore"
 16  	exchange "github.com/ipfs/boxo/exchange"
 17  	rpqm "github.com/ipfs/boxo/routing/providerquerymanager"
 18  	"github.com/ipfs/go-cid"
 19  	ipld "github.com/ipfs/go-ipld-format"
 20  	version "github.com/ipfs/kubo"
 21  	"github.com/ipfs/kubo/config"
 22  	"github.com/libp2p/go-libp2p/core/host"
 23  	peer "github.com/libp2p/go-libp2p/core/peer"
 24  	"github.com/libp2p/go-libp2p/core/routing"
 25  	"go.uber.org/fx"
 26  
 27  	blocks "github.com/ipfs/go-block-format"
 28  	"github.com/ipfs/kubo/core/node/helpers"
 29  )
 30  
 31  // Docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#internalbitswap
 32  const (
 33  	DefaultEngineBlockstoreWorkerCount = 128
 34  	DefaultTaskWorkerCount             = 8
 35  	DefaultEngineTaskWorkerCount       = 8
 36  	DefaultMaxOutstandingBytesPerPeer  = 1 << 20
 37  	DefaultProviderSearchDelay         = 1000 * time.Millisecond
 38  	DefaultMaxProviders                = 10 // matching BitswapClientDefaultMaxProviders from https://github.com/ipfs/boxo/blob/v0.29.1/bitswap/internal/defaults/defaults.go#L15
 39  	DefaultWantHaveReplaceSize         = 1024
 40  )
 41  
 42  type bitswapOptionsOut struct {
 43  	fx.Out
 44  
 45  	BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"`
 46  }
 47  
 48  // BitswapOptions creates configuration options for Bitswap from the config file
 49  // and whether to provide data.
 50  func BitswapOptions(cfg *config.Config) any {
 51  	return func() bitswapOptionsOut {
 52  		var internalBsCfg config.InternalBitswap
 53  		if cfg.Internal.Bitswap != nil {
 54  			internalBsCfg = *cfg.Internal.Bitswap
 55  		}
 56  
 57  		opts := []bitswap.Option{
 58  			bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale
 59  			bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))),
 60  			bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))),
 61  			bitswap.EngineTaskWorkerCount(int(internalBsCfg.EngineTaskWorkerCount.WithDefault(DefaultEngineTaskWorkerCount))),
 62  			bitswap.MaxOutstandingBytesPerPeer(int(internalBsCfg.MaxOutstandingBytesPerPeer.WithDefault(DefaultMaxOutstandingBytesPerPeer))),
 63  			bitswap.WithWantHaveReplaceSize(int(internalBsCfg.WantHaveReplaceSize.WithDefault(DefaultWantHaveReplaceSize))),
 64  		}
 65  
 66  		return bitswapOptionsOut{BitswapOpts: opts}
 67  	}
 68  }
 69  
 70  type bitswapIn struct {
 71  	fx.In
 72  
 73  	Mctx        helpers.MetricsCtx
 74  	Cfg         *config.Config
 75  	Host        host.Host
 76  	Discovery   routing.ContentDiscovery
 77  	Bs          blockstore.GCBlockstore
 78  	BitswapOpts []bitswap.Option `group:"bitswap-options"`
 79  }
 80  
 81  // Bitswap creates the BitSwap server/client instance.
 82  // If Bitswap.ServerEnabled is false, the node will act only as a client
 83  // using an empty blockstore to prevent serving blocks to other peers.
 84  func Bitswap(serverEnabled, libp2pEnabled, httpEnabled bool) any {
 85  	return func(in bitswapIn, lc fx.Lifecycle) (*bitswap.Bitswap, error) {
 86  		var bitswapNetworks, bitswapLibp2p network.BitSwapNetwork
 87  		var bitswapBlockstore blockstore.Blockstore = in.Bs
 88  
 89  		connEvtMgr := network.NewConnectEventManager()
 90  
 91  		libp2pEnabled := in.Cfg.Bitswap.Libp2pEnabled.WithDefault(config.DefaultBitswapLibp2pEnabled)
 92  		if libp2pEnabled {
 93  			bitswapLibp2p = bsnet.NewFromIpfsHost(
 94  				in.Host,
 95  				bsnet.WithConnectEventManager(connEvtMgr),
 96  			)
 97  		}
 98  
 99  		if httpEnabled {
100  			httpCfg := in.Cfg.HTTPRetrieval
101  			maxBlockSize, err := humanize.ParseBytes(httpCfg.MaxBlockSize.WithDefault(config.DefaultHTTPRetrievalMaxBlockSize))
102  			if err != nil {
103  				return nil, err
104  			}
105  			logger.Infof("HTTP Retrieval enabled: Allowlist: %t. Denylist: %t",
106  				httpCfg.Allowlist != nil,
107  				httpCfg.Denylist != nil,
108  			)
109  
110  			bitswapHTTP := httpnet.New(in.Host,
111  				httpnet.WithHTTPWorkers(int(httpCfg.NumWorkers.WithDefault(config.DefaultHTTPRetrievalNumWorkers))),
112  				httpnet.WithAllowlist(httpCfg.Allowlist),
113  				httpnet.WithDenylist(httpCfg.Denylist),
114  				httpnet.WithInsecureSkipVerify(httpCfg.TLSInsecureSkipVerify.WithDefault(config.DefaultHTTPRetrievalTLSInsecureSkipVerify)),
115  				httpnet.WithMaxBlockSize(int64(maxBlockSize)),
116  				httpnet.WithUserAgent(version.GetUserAgentVersion()),
117  				httpnet.WithMetricsLabelsForEndpoints(httpCfg.Allowlist),
118  				httpnet.WithConnectEventManager(connEvtMgr),
119  			)
120  			bitswapNetworks = network.New(in.Host.Peerstore(), bitswapLibp2p, bitswapHTTP)
121  		} else if libp2pEnabled {
122  			bitswapNetworks = bitswapLibp2p
123  		} else {
124  			return nil, errors.New("invalid configuration: Bitswap.Libp2pEnabled and HTTPRetrieval.Enabled are both disabled, unable to initialize Bitswap")
125  		}
126  
127  		// Kubo uses own, customized ProviderQueryManager
128  		in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)))
129  		var maxProviders int = DefaultMaxProviders
130  
131  		var bcDisposition string
132  		if in.Cfg.Internal.Bitswap != nil {
133  			maxProviders = int(in.Cfg.Internal.Bitswap.ProviderSearchMaxResults.WithDefault(DefaultMaxProviders))
134  			if in.Cfg.Internal.Bitswap.BroadcastControl != nil {
135  				bcCfg := in.Cfg.Internal.Bitswap.BroadcastControl
136  				bcEnable := bcCfg.Enable.WithDefault(config.DefaultBroadcastControlEnable)
137  				in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlEnable(bcEnable)))
138  				if bcEnable {
139  					bcDisposition = "enabled"
140  					bcMaxPeers := int(bcCfg.MaxPeers.WithDefault(config.DefaultBroadcastControlMaxPeers))
141  					in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxPeers(bcMaxPeers)))
142  
143  					bcLocalPeers := bcCfg.LocalPeers.WithDefault(config.DefaultBroadcastControlLocalPeers)
144  					in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlLocalPeers(bcLocalPeers)))
145  
146  					bcPeeredPeers := bcCfg.PeeredPeers.WithDefault(config.DefaultBroadcastControlPeeredPeers)
147  					in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlPeeredPeers(bcPeeredPeers)))
148  
149  					bcMaxRandomPeers := int(bcCfg.MaxRandomPeers.WithDefault(config.DefaultBroadcastControlMaxRandomPeers))
150  					in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxRandomPeers(bcMaxRandomPeers)))
151  
152  					bcSendToPendingPeers := bcCfg.SendToPendingPeers.WithDefault(config.DefaultBroadcastControlSendToPendingPeers)
153  					in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlSendToPendingPeers(bcSendToPendingPeers)))
154  				} else {
155  					bcDisposition = "disabled"
156  				}
157  			}
158  		}
159  
160  		// If broadcast control is not configured, then configure with defaults.
161  		if bcDisposition == "" {
162  			in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlEnable(config.DefaultBroadcastControlEnable)))
163  			if config.DefaultBroadcastControlEnable {
164  				bcDisposition = "enabled"
165  				in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxPeers(config.DefaultBroadcastControlMaxPeers)))
166  				in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlLocalPeers(config.DefaultBroadcastControlLocalPeers)))
167  				in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlPeeredPeers(config.DefaultBroadcastControlPeeredPeers)))
168  				in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlMaxRandomPeers(config.DefaultBroadcastControlMaxRandomPeers)))
169  				in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.BroadcastControlSendToPendingPeers(config.DefaultBroadcastControlSendToPendingPeers)))
170  			} else {
171  				bcDisposition = "enabled"
172  			}
173  		}
174  		logger.Infof("bitswap client broadcast control %s", bcDisposition)
175  
176  		ignoredPeerIDs := make([]peer.ID, 0, len(in.Cfg.Routing.IgnoreProviders))
177  		for _, str := range in.Cfg.Routing.IgnoreProviders {
178  			pid, err := peer.Decode(str)
179  			if err != nil {
180  				return nil, err
181  			}
182  			ignoredPeerIDs = append(ignoredPeerIDs, pid)
183  		}
184  		providerQueryMgr, err := rpqm.New(bitswapNetworks,
185  			in.Discovery,
186  			rpqm.WithMaxProviders(maxProviders),
187  			rpqm.WithIgnoreProviders(ignoredPeerIDs...),
188  		)
189  		if err != nil {
190  			return nil, err
191  		}
192  
193  		// Explicitly enable/disable server
194  		in.BitswapOpts = append(in.BitswapOpts, bitswap.WithServerEnabled(serverEnabled))
195  
196  		bs := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetworks, providerQueryMgr, bitswapBlockstore, in.BitswapOpts...)
197  
198  		lc.Append(fx.Hook{
199  			OnStop: func(ctx context.Context) error {
200  				return bs.Close()
201  			},
202  		})
203  		return bs, nil
204  	}
205  }
206  
207  // OnlineExchange creates new LibP2P backed block exchange.
208  // Returns a no-op exchange if Bitswap is disabled.
209  func OnlineExchange(isBitswapActive bool) any {
210  	return func(in *bitswap.Bitswap, lc fx.Lifecycle) exchange.Interface {
211  		if !isBitswapActive {
212  			return &noopExchange{closer: in}
213  		}
214  		lc.Append(fx.Hook{
215  			OnStop: func(ctx context.Context) error {
216  				return in.Close()
217  			},
218  		})
219  		return in
220  	}
221  }
222  
223  type noopExchange struct {
224  	closer io.Closer
225  }
226  
227  func (e *noopExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
228  	return nil, ipld.ErrNotFound{Cid: c}
229  }
230  
231  func (e *noopExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
232  	ch := make(chan blocks.Block)
233  	close(ch)
234  	return ch, nil
235  }
236  
237  func (e *noopExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
238  	return nil
239  }
240  
241  func (e *noopExchange) Close() error {
242  	return e.closer.Close()
243  }