/ core / node / groups.go
groups.go
  1  package node
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"fmt"
  7  	"regexp"
  8  	"strings"
  9  	"time"
 10  
 11  	blockstore "github.com/ipfs/boxo/blockstore"
 12  	offline "github.com/ipfs/boxo/exchange/offline"
 13  	uio "github.com/ipfs/boxo/ipld/unixfs/io"
 14  	util "github.com/ipfs/boxo/util"
 15  	"github.com/ipfs/go-log/v2"
 16  	"github.com/ipfs/kubo/config"
 17  	"github.com/ipfs/kubo/core/node/libp2p"
 18  	"github.com/ipfs/kubo/p2p"
 19  	pubsub "github.com/libp2p/go-libp2p-pubsub"
 20  	"github.com/libp2p/go-libp2p-pubsub/timecache"
 21  	"github.com/libp2p/go-libp2p/core/peer"
 22  	rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
 23  	"go.uber.org/fx"
 24  )
 25  
 26  var logger = log.Logger("core:constructor")
 27  
 28  var BaseLibP2P = fx.Options(
 29  	fx.Provide(libp2p.PNet),
 30  	fx.Provide(libp2p.ConnectionManager),
 31  	fx.Provide(libp2p.Host),
 32  	fx.Provide(libp2p.MultiaddrResolver),
 33  
 34  	fx.Provide(libp2p.DiscoveryHandler),
 35  
 36  	fx.Invoke(libp2p.PNetChecker),
 37  )
 38  
 39  func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
 40  	var connmgr fx.Option
 41  
 42  	// set connmgr based on Swarm.ConnMgr.Type
 43  	connMgrType := cfg.Swarm.ConnMgr.Type.WithDefault(config.DefaultConnMgrType)
 44  	switch connMgrType {
 45  	case "none":
 46  		connmgr = fx.Options() // noop
 47  	case "", "basic":
 48  		grace := cfg.Swarm.ConnMgr.GracePeriod.WithDefault(config.DefaultConnMgrGracePeriod)
 49  		low := int(cfg.Swarm.ConnMgr.LowWater.WithDefault(config.DefaultConnMgrLowWater))
 50  		high := int(cfg.Swarm.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater))
 51  		silence := cfg.Swarm.ConnMgr.SilencePeriod.WithDefault(config.DefaultConnMgrSilencePeriod)
 52  		connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace, silence))
 53  
 54  	default:
 55  		return fx.Error(fmt.Errorf("unrecognized Swarm.ConnMgr.Type: %q", connMgrType))
 56  	}
 57  
 58  	// parse PubSub config
 59  
 60  	ps, disc := fx.Options(), fx.Options()
 61  	if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") {
 62  		disc = fx.Provide(libp2p.TopicDiscovery())
 63  
 64  		var pubsubOptions []pubsub.Option
 65  		pubsubOptions = append(
 66  			pubsubOptions,
 67  			pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning),
 68  			pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)),
 69  		)
 70  
 71  		var seenMessagesStrategy timecache.Strategy
 72  		configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy)
 73  		switch configSeenMessagesStrategy {
 74  		case config.LastSeenMessagesStrategy:
 75  			seenMessagesStrategy = timecache.Strategy_LastSeen
 76  		case config.FirstSeenMessagesStrategy:
 77  			seenMessagesStrategy = timecache.Strategy_FirstSeen
 78  		default:
 79  			return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy))
 80  		}
 81  		pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy))
 82  
 83  		switch cfg.Pubsub.Router {
 84  		case "":
 85  			fallthrough
 86  		case "gossipsub":
 87  			ps = fx.Provide(libp2p.GossipSub(pubsubOptions...))
 88  		case "floodsub":
 89  			ps = fx.Provide(libp2p.FloodSub(pubsubOptions...))
 90  		default:
 91  			return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router))
 92  		}
 93  	}
 94  
 95  	autonat := fx.Options()
 96  
 97  	switch cfg.AutoNAT.ServiceMode {
 98  	default:
 99  		panic("BUG: unhandled autonat service mode")
100  	case config.AutoNATServiceDisabled:
101  	case config.AutoNATServiceUnset:
102  		// TODO
103  		//
104  		// We're enabling the AutoNAT service by default on _all_ nodes
105  		// for the moment.
106  		//
107  		// We should consider disabling it by default if the dht is set
108  		// to dhtclient.
109  		fallthrough
110  	case config.AutoNATServiceEnabled:
111  		autonat = fx.Provide(libp2p.AutoNATService(cfg.AutoNAT.Throttle, false))
112  	case config.AutoNATServiceEnabledV1Only:
113  		autonat = fx.Provide(libp2p.AutoNATService(cfg.AutoNAT.Throttle, true))
114  	}
115  
116  	enableTCPTransport := cfg.Swarm.Transports.Network.TCP.WithDefault(true)
117  	enableWebsocketTransport := cfg.Swarm.Transports.Network.Websocket.WithDefault(true)
118  	enableRelayTransport := cfg.Swarm.Transports.Network.Relay.WithDefault(true) // nolint
119  	enableRelayService := cfg.Swarm.RelayService.Enabled.WithDefault(enableRelayTransport)
120  	enableRelayClient := cfg.Swarm.RelayClient.Enabled.WithDefault(enableRelayTransport)
121  	enableAutoTLS := cfg.AutoTLS.Enabled.WithDefault(config.DefaultAutoTLSEnabled)
122  	enableAutoWSS := cfg.AutoTLS.AutoWSS.WithDefault(config.DefaultAutoWSS)
123  	atlsLog := log.Logger("autotls")
124  
125  	// Log error when relay subsystem could not be initialized due to missing dependency
126  	if !enableRelayTransport {
127  		if enableRelayService {
128  			logger.Fatal("Failed to enable `Swarm.RelayService`, it requires `Swarm.Transports.Network.Relay` to be true.")
129  		}
130  		if enableRelayClient {
131  			logger.Fatal("Failed to enable `Swarm.RelayClient`, it requires `Swarm.Transports.Network.Relay` to be true.")
132  		}
133  	}
134  
135  	switch {
136  	case enableAutoTLS && enableTCPTransport && enableWebsocketTransport:
137  		// AutoTLS for Secure WebSockets: ensure WSS listeners are in place (manual or automatic)
138  		wssWildcard := fmt.Sprintf("/tls/sni/*.%s/ws", cfg.AutoTLS.DomainSuffix.WithDefault(config.DefaultDomainSuffix))
139  		wssWildcardPresent := false
140  		customWsPresent := false
141  		customWsRegex := regexp.MustCompile(`/wss?$`)
142  		tcpRegex := regexp.MustCompile(`/tcp/\d+$`)
143  
144  		// inspect listeners defined in config at Addresses.Swarm
145  		var tcpListeners []string
146  		for _, listener := range cfg.Addresses.Swarm {
147  			// detect if user manually added /tls/sni/.../ws listener matching AutoTLS.DomainSuffix
148  			if strings.Contains(listener, wssWildcard) {
149  				atlsLog.Infof("found compatible wildcard listener in Addresses.Swarm. AutoTLS will be used on %s", listener)
150  				wssWildcardPresent = true
151  				break
152  			}
153  			// detect if user manually added own /ws or /wss listener that is
154  			// not related to AutoTLS feature
155  			if customWsRegex.MatchString(listener) {
156  				atlsLog.Infof("found custom /ws listener set by user in Addresses.Swarm. AutoTLS will not be used on %s.", listener)
157  				customWsPresent = true
158  				break
159  			}
160  			// else, remember /tcp listeners that can be reused for /tls/sni/../ws
161  			if tcpRegex.MatchString(listener) {
162  				tcpListeners = append(tcpListeners, listener)
163  			}
164  		}
165  
166  		// Append AutoTLS's wildcard listener
167  		// if no manual /ws listener was set by the user
168  		if enableAutoWSS && !wssWildcardPresent && !customWsPresent {
169  			if len(tcpListeners) == 0 {
170  				logger.Error("Invalid configuration, AutoTLS will be disabled: AutoTLS.AutoWSS=true requires at least one /tcp listener present in Addresses.Swarm, see https://github.com/ipfs/kubo/blob/master/docs/config.md#autotls")
171  				enableAutoTLS = false
172  			}
173  			for _, tcpListener := range tcpListeners {
174  				wssListener := tcpListener + wssWildcard
175  				cfg.Addresses.Swarm = append(cfg.Addresses.Swarm, wssListener)
176  				atlsLog.Infof("appended AutoWSS listener: %s", wssListener)
177  			}
178  		}
179  
180  		if !wssWildcardPresent && !enableAutoWSS {
181  			logger.Error(fmt.Sprintf("Invalid configuration, AutoTLS will be disabled: AutoTLS.Enabled=true requires a /tcp listener ending with %q to be present in Addresses.Swarm or AutoTLS.AutoWSS=true, see https://github.com/ipfs/kubo/blob/master/docs/config.md#autotls", wssWildcard))
182  			enableAutoTLS = false
183  		}
184  	case enableAutoTLS && !enableTCPTransport:
185  		logger.Error("Invalid configuration: AutoTLS.Enabled=true requires Swarm.Transports.Network.TCP to be true as well. AutoTLS will be disabled.")
186  		enableAutoTLS = false
187  	case enableAutoTLS && !enableWebsocketTransport:
188  		logger.Error("Invalid configuration: AutoTLS.Enabled=true requires Swarm.Transports.Network.Websocket to be true as well. AutoTLS will be disabled.")
189  		enableAutoTLS = false
190  	}
191  
192  	// Gather all the options
193  	opts := fx.Options(
194  		BaseLibP2P,
195  
196  		// identify's AgentVersion (incl. optional agent-version-suffix)
197  		fx.Provide(libp2p.UserAgent()),
198  
199  		// Services (resource management)
200  		fx.Provide(libp2p.ResourceManager(bcfg.Repo.Path(), cfg.Swarm, userResourceOverrides)),
201  		maybeProvide(libp2p.P2PForgeCertMgr(bcfg.Repo.Path(), cfg.AutoTLS, atlsLog), enableAutoTLS),
202  		maybeInvoke(libp2p.StartP2PAutoTLS, enableAutoTLS),
203  		fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)),
204  		fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)),
205  		fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)),
206  		fx.Provide(libp2p.RelayTransport(enableRelayTransport)),
207  		fx.Provide(libp2p.RelayService(enableRelayService, cfg.Swarm.RelayService)),
208  		fx.Provide(libp2p.Transports(cfg.Swarm.Transports)),
209  		fx.Provide(libp2p.ListenOn(cfg.Addresses.Swarm)),
210  		fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled)),
211  		fx.Provide(libp2p.ForceReachability(cfg.Internal.Libp2pForceReachability)),
212  		fx.Provide(libp2p.HolePunching(cfg.Swarm.EnableHolePunching, enableRelayClient)),
213  
214  		fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),
215  
216  		fx.Provide(libp2p.Routing),
217  		fx.Provide(libp2p.ContentRouting),
218  		fx.Provide(libp2p.ContentDiscovery),
219  
220  		fx.Provide(libp2p.BaseRouting(cfg)),
221  		maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),
222  
223  		maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
224  		maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap),
225  		libp2p.MaybeAutoRelay(cfg.Swarm.RelayClient.StaticRelays, cfg.Peering, enableRelayClient),
226  		autonat,
227  		connmgr,
228  		ps,
229  		disc,
230  	)
231  
232  	return opts
233  }
234  
235  // Storage groups units which setup datastore based persistence and blockstore layers
236  func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
237  	cacheOpts := blockstore.DefaultCacheOpts()
238  	cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
239  	cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize))
240  	if !bcfg.Permanent {
241  		cacheOpts.HasBloomFilterSize = 0
242  	}
243  
244  	finalBstore := fx.Provide(GcBlockstoreCtor)
245  	if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
246  		finalBstore = fx.Provide(FilestoreBlockstoreCtor)
247  	}
248  
249  	return fx.Options(
250  		fx.Provide(RepoConfig),
251  		fx.Provide(Datastore),
252  		fx.Provide(BaseBlockstoreCtor(
253  			cacheOpts,
254  			cfg.Datastore.HashOnRead,
255  			cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough),
256  			cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy),
257  		)),
258  		finalBstore,
259  	)
260  }
261  
262  // Identity groups units providing cryptographic identity
263  func Identity(cfg *config.Config) fx.Option {
264  	// PeerID
265  
266  	cid := cfg.Identity.PeerID
267  	if cid == "" {
268  		return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)"))
269  	}
270  	if len(cid) == 0 {
271  		return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)"))
272  	}
273  
274  	id, err := peer.Decode(cid)
275  	if err != nil {
276  		return fx.Error(fmt.Errorf("peer ID invalid: %s", err))
277  	}
278  
279  	// Private Key
280  
281  	if cfg.Identity.PrivKey == "" {
282  		return fx.Options( // No PK (usually in tests)
283  			fx.Provide(PeerID(id)),
284  			fx.Provide(libp2p.Peerstore),
285  		)
286  	}
287  
288  	sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
289  	if err != nil {
290  		return fx.Error(err)
291  	}
292  
293  	return fx.Options( // Full identity
294  		fx.Provide(PeerID(id)),
295  		fx.Provide(PrivateKey(sk)),
296  		fx.Provide(libp2p.Peerstore),
297  
298  		fx.Invoke(libp2p.PstoreAddSelfKeys),
299  	)
300  }
301  
302  // IPNS groups namesys related units
303  var IPNS = fx.Options(
304  	fx.Provide(RecordValidator),
305  )
306  
307  // Online groups online-only units
308  func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
309  	// Namesys params
310  
311  	ipnsCacheSize := cfg.Ipns.ResolveCacheSize
312  	if ipnsCacheSize == 0 {
313  		ipnsCacheSize = DefaultIpnsCacheSize
314  	}
315  	if ipnsCacheSize < 0 {
316  		return fx.Error(errors.New("cannot specify negative resolve cache size"))
317  	}
318  
319  	// Republisher params
320  
321  	var repubPeriod, recordLifetime time.Duration
322  
323  	if cfg.Ipns.RepublishPeriod != "" {
324  		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
325  		if err != nil {
326  			return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err))
327  		}
328  
329  		if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
330  			return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d))
331  		}
332  
333  		repubPeriod = d
334  	}
335  
336  	if cfg.Ipns.RecordLifetime != "" {
337  		d, err := time.ParseDuration(cfg.Ipns.RecordLifetime)
338  		if err != nil {
339  			return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err))
340  		}
341  
342  		recordLifetime = d
343  	}
344  
345  	isBitswapLibp2pEnabled := cfg.Bitswap.Libp2pEnabled.WithDefault(config.DefaultBitswapLibp2pEnabled)
346  	isBitswapServerEnabled := cfg.Bitswap.ServerEnabled.WithDefault(config.DefaultBitswapServerEnabled)
347  	isHTTPRetrievalEnabled := cfg.HTTPRetrieval.Enabled.WithDefault(config.DefaultHTTPRetrievalEnabled)
348  
349  	// The Provide system handles both new CID announcements and periodic re-announcements.
350  	// Disabling is controlled by Provide.Enabled=false or setting Interval to 0.
351  	isProviderEnabled := cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) && cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) != 0
352  
353  	return fx.Options(
354  		fx.Provide(BitswapOptions(cfg)),
355  		fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)),
356  		fx.Provide(OnlineExchange(isBitswapLibp2pEnabled)),
357  		fx.Provide(DNSResolver),
358  		fx.Provide(Namesys(ipnsCacheSize, cfg.Ipns.MaxCacheTTL.WithDefault(config.DefaultIpnsMaxCacheTTL))),
359  		fx.Provide(Peering),
360  		PeerWith(cfg.Peering.Peers...),
361  
362  		fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)),
363  
364  		fx.Provide(p2p.New),
365  
366  		LibP2P(bcfg, cfg, userResourceOverrides),
367  		OnlineProviders(isProviderEnabled, cfg),
368  	)
369  }
370  
371  // Offline groups offline alternatives to Online units
372  func Offline(cfg *config.Config) fx.Option {
373  	return fx.Options(
374  		fx.Provide(offline.Exchange),
375  		fx.Provide(DNSResolver),
376  		fx.Provide(Namesys(0, 0)),
377  		fx.Provide(libp2p.Routing),
378  		fx.Provide(libp2p.ContentRouting),
379  		fx.Provide(libp2p.OfflineRouting),
380  		fx.Provide(libp2p.ContentDiscovery),
381  		OfflineProviders(),
382  	)
383  }
384  
385  // Core groups basic IPFS services
386  var Core = fx.Options(
387  	fx.Provide(Dag),
388  	fx.Provide(FetcherConfig),
389  	fx.Provide(PathResolverConfig),
390  )
391  
392  func Networked(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
393  	if bcfg.Online {
394  		return Online(bcfg, cfg, userResourceOverrides)
395  	}
396  	return Offline(cfg)
397  }
398  
399  // IPFS builds a group of fx Options based on the passed BuildCfg
400  func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
401  	if bcfg == nil {
402  		bcfg = new(BuildCfg)
403  	}
404  
405  	bcfgOpts, cfg := bcfg.options(ctx)
406  	if cfg == nil {
407  		return bcfgOpts // error
408  	}
409  
410  	userResourceOverrides, err := bcfg.Repo.UserResourceOverrides()
411  	if err != nil {
412  		return fx.Error(err)
413  	}
414  
415  	// Migrate users of deprecated Experimental.ShardingEnabled flag
416  	if cfg.Experimental.ShardingEnabled {
417  		logger.Fatal("The `Experimental.ShardingEnabled` field is no longer used, please remove it from the config. Use Import.UnixFSHAMTDirectorySizeThreshold instead.")
418  	}
419  	if !cfg.Internal.UnixFSShardingSizeThreshold.IsDefault() {
420  		msg := "The `Internal.UnixFSShardingSizeThreshold` field was renamed to `Import.UnixFSHAMTDirectorySizeThreshold`. Please update your config.\n"
421  		if !cfg.Import.UnixFSHAMTDirectorySizeThreshold.IsDefault() {
422  			logger.Fatal(msg) // conflicting values, hard fail
423  		}
424  		logger.Error(msg)
425  		// Migrate the old OptionalString value to the new OptionalBytes field.
426  		// Since OptionalBytes embeds OptionalString, we can construct it directly
427  		// with the old value, preserving the user's original string (e.g., "256KiB").
428  		cfg.Import.UnixFSHAMTDirectorySizeThreshold = config.OptionalBytes{OptionalString: *cfg.Internal.UnixFSShardingSizeThreshold}
429  	}
430  
431  	// Validate Import configuration
432  	if err := config.ValidateImportConfig(&cfg.Import); err != nil {
433  		return fx.Error(err)
434  	}
435  
436  	// Validate Provide configuration
437  	if err := config.ValidateProvideConfig(&cfg.Provide); err != nil {
438  		return fx.Error(err)
439  	}
440  
441  	// Directory sharding settings from Import config.
442  	// These globals affect both `ipfs add` and MFS (`ipfs files` API).
443  	shardSizeThreshold := cfg.Import.UnixFSHAMTDirectorySizeThreshold.WithDefault(config.DefaultUnixFSHAMTDirectorySizeThreshold)
444  	shardMaxFanout := cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout)
445  	uio.HAMTShardingSize = int(shardSizeThreshold)
446  	uio.DefaultShardWidth = int(shardMaxFanout)
447  	uio.HAMTSizeEstimation = cfg.Import.HAMTSizeEstimationMode()
448  
449  	providerStrategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
450  
451  	return fx.Options(
452  		bcfgOpts,
453  
454  		Storage(bcfg, cfg),
455  		Identity(cfg),
456  		IPNS,
457  		Networked(bcfg, cfg, userResourceOverrides),
458  		fx.Provide(BlockService(cfg)),
459  		fx.Provide(Pinning(providerStrategy)),
460  		fx.Provide(Files(providerStrategy)),
461  		Core,
462  	)
463  }