groups.go
1 package node 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "regexp" 8 "strings" 9 "time" 10 11 blockstore "github.com/ipfs/boxo/blockstore" 12 offline "github.com/ipfs/boxo/exchange/offline" 13 uio "github.com/ipfs/boxo/ipld/unixfs/io" 14 util "github.com/ipfs/boxo/util" 15 "github.com/ipfs/go-log/v2" 16 "github.com/ipfs/kubo/config" 17 "github.com/ipfs/kubo/core/node/libp2p" 18 "github.com/ipfs/kubo/p2p" 19 pubsub "github.com/libp2p/go-libp2p-pubsub" 20 "github.com/libp2p/go-libp2p-pubsub/timecache" 21 "github.com/libp2p/go-libp2p/core/peer" 22 rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" 23 "go.uber.org/fx" 24 ) 25 26 var logger = log.Logger("core:constructor") 27 28 var BaseLibP2P = fx.Options( 29 fx.Provide(libp2p.PNet), 30 fx.Provide(libp2p.ConnectionManager), 31 fx.Provide(libp2p.Host), 32 fx.Provide(libp2p.MultiaddrResolver), 33 34 fx.Provide(libp2p.DiscoveryHandler), 35 36 fx.Invoke(libp2p.PNetChecker), 37 ) 38 39 func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option { 40 var connmgr fx.Option 41 42 // set connmgr based on Swarm.ConnMgr.Type 43 connMgrType := cfg.Swarm.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) 44 switch connMgrType { 45 case "none": 46 connmgr = fx.Options() // noop 47 case "", "basic": 48 grace := cfg.Swarm.ConnMgr.GracePeriod.WithDefault(config.DefaultConnMgrGracePeriod) 49 low := int(cfg.Swarm.ConnMgr.LowWater.WithDefault(config.DefaultConnMgrLowWater)) 50 high := int(cfg.Swarm.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater)) 51 silence := cfg.Swarm.ConnMgr.SilencePeriod.WithDefault(config.DefaultConnMgrSilencePeriod) 52 connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace, silence)) 53 54 default: 55 return fx.Error(fmt.Errorf("unrecognized Swarm.ConnMgr.Type: %q", connMgrType)) 56 } 57 58 // parse PubSub config 59 60 ps, disc := fx.Options(), fx.Options() 61 if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") { 62 disc = fx.Provide(libp2p.TopicDiscovery()) 63 64 var pubsubOptions []pubsub.Option 65 pubsubOptions = append( 66 pubsubOptions, 67 pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning), 68 pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)), 69 ) 70 71 var seenMessagesStrategy timecache.Strategy 72 configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy) 73 switch configSeenMessagesStrategy { 74 case config.LastSeenMessagesStrategy: 75 seenMessagesStrategy = timecache.Strategy_LastSeen 76 case config.FirstSeenMessagesStrategy: 77 seenMessagesStrategy = timecache.Strategy_FirstSeen 78 default: 79 return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy)) 80 } 81 pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy)) 82 83 switch cfg.Pubsub.Router { 84 case "": 85 fallthrough 86 case "gossipsub": 87 ps = fx.Provide(libp2p.GossipSub(pubsubOptions...)) 88 case "floodsub": 89 ps = fx.Provide(libp2p.FloodSub(pubsubOptions...)) 90 default: 91 return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router)) 92 } 93 } 94 95 autonat := fx.Options() 96 97 switch cfg.AutoNAT.ServiceMode { 98 default: 99 panic("BUG: unhandled autonat service mode") 100 case config.AutoNATServiceDisabled: 101 case config.AutoNATServiceUnset: 102 // TODO 103 // 104 // We're enabling the AutoNAT service by default on _all_ nodes 105 // for the moment. 106 // 107 // We should consider disabling it by default if the dht is set 108 // to dhtclient. 109 fallthrough 110 case config.AutoNATServiceEnabled: 111 autonat = fx.Provide(libp2p.AutoNATService(cfg.AutoNAT.Throttle, false)) 112 case config.AutoNATServiceEnabledV1Only: 113 autonat = fx.Provide(libp2p.AutoNATService(cfg.AutoNAT.Throttle, true)) 114 } 115 116 enableTCPTransport := cfg.Swarm.Transports.Network.TCP.WithDefault(true) 117 enableWebsocketTransport := cfg.Swarm.Transports.Network.Websocket.WithDefault(true) 118 enableRelayTransport := cfg.Swarm.Transports.Network.Relay.WithDefault(true) // nolint 119 enableRelayService := cfg.Swarm.RelayService.Enabled.WithDefault(enableRelayTransport) 120 enableRelayClient := cfg.Swarm.RelayClient.Enabled.WithDefault(enableRelayTransport) 121 enableAutoTLS := cfg.AutoTLS.Enabled.WithDefault(config.DefaultAutoTLSEnabled) 122 enableAutoWSS := cfg.AutoTLS.AutoWSS.WithDefault(config.DefaultAutoWSS) 123 atlsLog := log.Logger("autotls") 124 125 // Log error when relay subsystem could not be initialized due to missing dependency 126 if !enableRelayTransport { 127 if enableRelayService { 128 logger.Fatal("Failed to enable `Swarm.RelayService`, it requires `Swarm.Transports.Network.Relay` to be true.") 129 } 130 if enableRelayClient { 131 logger.Fatal("Failed to enable `Swarm.RelayClient`, it requires `Swarm.Transports.Network.Relay` to be true.") 132 } 133 } 134 135 switch { 136 case enableAutoTLS && enableTCPTransport && enableWebsocketTransport: 137 // AutoTLS for Secure WebSockets: ensure WSS listeners are in place (manual or automatic) 138 wssWildcard := fmt.Sprintf("/tls/sni/*.%s/ws", cfg.AutoTLS.DomainSuffix.WithDefault(config.DefaultDomainSuffix)) 139 wssWildcardPresent := false 140 customWsPresent := false 141 customWsRegex := regexp.MustCompile(`/wss?$`) 142 tcpRegex := regexp.MustCompile(`/tcp/\d+$`) 143 144 // inspect listeners defined in config at Addresses.Swarm 145 var tcpListeners []string 146 for _, listener := range cfg.Addresses.Swarm { 147 // detect if user manually added /tls/sni/.../ws listener matching AutoTLS.DomainSuffix 148 if strings.Contains(listener, wssWildcard) { 149 atlsLog.Infof("found compatible wildcard listener in Addresses.Swarm. AutoTLS will be used on %s", listener) 150 wssWildcardPresent = true 151 break 152 } 153 // detect if user manually added own /ws or /wss listener that is 154 // not related to AutoTLS feature 155 if customWsRegex.MatchString(listener) { 156 atlsLog.Infof("found custom /ws listener set by user in Addresses.Swarm. AutoTLS will not be used on %s.", listener) 157 customWsPresent = true 158 break 159 } 160 // else, remember /tcp listeners that can be reused for /tls/sni/../ws 161 if tcpRegex.MatchString(listener) { 162 tcpListeners = append(tcpListeners, listener) 163 } 164 } 165 166 // Append AutoTLS's wildcard listener 167 // if no manual /ws listener was set by the user 168 if enableAutoWSS && !wssWildcardPresent && !customWsPresent { 169 if len(tcpListeners) == 0 { 170 logger.Error("Invalid configuration, AutoTLS will be disabled: AutoTLS.AutoWSS=true requires at least one /tcp listener present in Addresses.Swarm, see https://github.com/ipfs/kubo/blob/master/docs/config.md#autotls") 171 enableAutoTLS = false 172 } 173 for _, tcpListener := range tcpListeners { 174 wssListener := tcpListener + wssWildcard 175 cfg.Addresses.Swarm = append(cfg.Addresses.Swarm, wssListener) 176 atlsLog.Infof("appended AutoWSS listener: %s", wssListener) 177 } 178 } 179 180 if !wssWildcardPresent && !enableAutoWSS { 181 logger.Error(fmt.Sprintf("Invalid configuration, AutoTLS will be disabled: AutoTLS.Enabled=true requires a /tcp listener ending with %q to be present in Addresses.Swarm or AutoTLS.AutoWSS=true, see https://github.com/ipfs/kubo/blob/master/docs/config.md#autotls", wssWildcard)) 182 enableAutoTLS = false 183 } 184 case enableAutoTLS && !enableTCPTransport: 185 logger.Error("Invalid configuration: AutoTLS.Enabled=true requires Swarm.Transports.Network.TCP to be true as well. AutoTLS will be disabled.") 186 enableAutoTLS = false 187 case enableAutoTLS && !enableWebsocketTransport: 188 logger.Error("Invalid configuration: AutoTLS.Enabled=true requires Swarm.Transports.Network.Websocket to be true as well. AutoTLS will be disabled.") 189 enableAutoTLS = false 190 } 191 192 // Gather all the options 193 opts := fx.Options( 194 BaseLibP2P, 195 196 // identify's AgentVersion (incl. optional agent-version-suffix) 197 fx.Provide(libp2p.UserAgent()), 198 199 // Services (resource management) 200 fx.Provide(libp2p.ResourceManager(bcfg.Repo.Path(), cfg.Swarm, userResourceOverrides)), 201 maybeProvide(libp2p.P2PForgeCertMgr(bcfg.Repo.Path(), cfg.AutoTLS, atlsLog), enableAutoTLS), 202 maybeInvoke(libp2p.StartP2PAutoTLS, enableAutoTLS), 203 fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)), 204 fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)), 205 fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)), 206 fx.Provide(libp2p.RelayTransport(enableRelayTransport)), 207 fx.Provide(libp2p.RelayService(enableRelayService, cfg.Swarm.RelayService)), 208 fx.Provide(libp2p.Transports(cfg.Swarm.Transports)), 209 fx.Provide(libp2p.ListenOn(cfg.Addresses.Swarm)), 210 fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled)), 211 fx.Provide(libp2p.ForceReachability(cfg.Internal.Libp2pForceReachability)), 212 fx.Provide(libp2p.HolePunching(cfg.Swarm.EnableHolePunching, enableRelayClient)), 213 214 fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)), 215 216 fx.Provide(libp2p.Routing), 217 fx.Provide(libp2p.ContentRouting), 218 fx.Provide(libp2p.ContentDiscovery), 219 220 fx.Provide(libp2p.BaseRouting(cfg)), 221 maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), 222 223 maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), 224 maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap), 225 libp2p.MaybeAutoRelay(cfg.Swarm.RelayClient.StaticRelays, cfg.Peering, enableRelayClient), 226 autonat, 227 connmgr, 228 ps, 229 disc, 230 ) 231 232 return opts 233 } 234 235 // Storage groups units which setup datastore based persistence and blockstore layers 236 func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { 237 cacheOpts := blockstore.DefaultCacheOpts() 238 cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize 239 cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize)) 240 if !bcfg.Permanent { 241 cacheOpts.HasBloomFilterSize = 0 242 } 243 244 finalBstore := fx.Provide(GcBlockstoreCtor) 245 if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { 246 finalBstore = fx.Provide(FilestoreBlockstoreCtor) 247 } 248 249 return fx.Options( 250 fx.Provide(RepoConfig), 251 fx.Provide(Datastore), 252 fx.Provide(BaseBlockstoreCtor( 253 cacheOpts, 254 cfg.Datastore.HashOnRead, 255 cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough), 256 cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy), 257 )), 258 finalBstore, 259 ) 260 } 261 262 // Identity groups units providing cryptographic identity 263 func Identity(cfg *config.Config) fx.Option { 264 // PeerID 265 266 cid := cfg.Identity.PeerID 267 if cid == "" { 268 return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)")) 269 } 270 if len(cid) == 0 { 271 return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)")) 272 } 273 274 id, err := peer.Decode(cid) 275 if err != nil { 276 return fx.Error(fmt.Errorf("peer ID invalid: %s", err)) 277 } 278 279 // Private Key 280 281 if cfg.Identity.PrivKey == "" { 282 return fx.Options( // No PK (usually in tests) 283 fx.Provide(PeerID(id)), 284 fx.Provide(libp2p.Peerstore), 285 ) 286 } 287 288 sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") 289 if err != nil { 290 return fx.Error(err) 291 } 292 293 return fx.Options( // Full identity 294 fx.Provide(PeerID(id)), 295 fx.Provide(PrivateKey(sk)), 296 fx.Provide(libp2p.Peerstore), 297 298 fx.Invoke(libp2p.PstoreAddSelfKeys), 299 ) 300 } 301 302 // IPNS groups namesys related units 303 var IPNS = fx.Options( 304 fx.Provide(RecordValidator), 305 ) 306 307 // Online groups online-only units 308 func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option { 309 // Namesys params 310 311 ipnsCacheSize := cfg.Ipns.ResolveCacheSize 312 if ipnsCacheSize == 0 { 313 ipnsCacheSize = DefaultIpnsCacheSize 314 } 315 if ipnsCacheSize < 0 { 316 return fx.Error(errors.New("cannot specify negative resolve cache size")) 317 } 318 319 // Republisher params 320 321 var repubPeriod, recordLifetime time.Duration 322 323 if cfg.Ipns.RepublishPeriod != "" { 324 d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) 325 if err != nil { 326 return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)) 327 } 328 329 if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { 330 return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)) 331 } 332 333 repubPeriod = d 334 } 335 336 if cfg.Ipns.RecordLifetime != "" { 337 d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) 338 if err != nil { 339 return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)) 340 } 341 342 recordLifetime = d 343 } 344 345 isBitswapLibp2pEnabled := cfg.Bitswap.Libp2pEnabled.WithDefault(config.DefaultBitswapLibp2pEnabled) 346 isBitswapServerEnabled := cfg.Bitswap.ServerEnabled.WithDefault(config.DefaultBitswapServerEnabled) 347 isHTTPRetrievalEnabled := cfg.HTTPRetrieval.Enabled.WithDefault(config.DefaultHTTPRetrievalEnabled) 348 349 // The Provide system handles both new CID announcements and periodic re-announcements. 350 // Disabling is controlled by Provide.Enabled=false or setting Interval to 0. 351 isProviderEnabled := cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) && cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) != 0 352 353 return fx.Options( 354 fx.Provide(BitswapOptions(cfg)), 355 fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)), 356 fx.Provide(OnlineExchange(isBitswapLibp2pEnabled)), 357 fx.Provide(DNSResolver), 358 fx.Provide(Namesys(ipnsCacheSize, cfg.Ipns.MaxCacheTTL.WithDefault(config.DefaultIpnsMaxCacheTTL))), 359 fx.Provide(Peering), 360 PeerWith(cfg.Peering.Peers...), 361 362 fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)), 363 364 fx.Provide(p2p.New), 365 366 LibP2P(bcfg, cfg, userResourceOverrides), 367 OnlineProviders(isProviderEnabled, cfg), 368 ) 369 } 370 371 // Offline groups offline alternatives to Online units 372 func Offline(cfg *config.Config) fx.Option { 373 return fx.Options( 374 fx.Provide(offline.Exchange), 375 fx.Provide(DNSResolver), 376 fx.Provide(Namesys(0, 0)), 377 fx.Provide(libp2p.Routing), 378 fx.Provide(libp2p.ContentRouting), 379 fx.Provide(libp2p.OfflineRouting), 380 fx.Provide(libp2p.ContentDiscovery), 381 OfflineProviders(), 382 ) 383 } 384 385 // Core groups basic IPFS services 386 var Core = fx.Options( 387 fx.Provide(Dag), 388 fx.Provide(FetcherConfig), 389 fx.Provide(PathResolverConfig), 390 ) 391 392 func Networked(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option { 393 if bcfg.Online { 394 return Online(bcfg, cfg, userResourceOverrides) 395 } 396 return Offline(cfg) 397 } 398 399 // IPFS builds a group of fx Options based on the passed BuildCfg 400 func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { 401 if bcfg == nil { 402 bcfg = new(BuildCfg) 403 } 404 405 bcfgOpts, cfg := bcfg.options(ctx) 406 if cfg == nil { 407 return bcfgOpts // error 408 } 409 410 userResourceOverrides, err := bcfg.Repo.UserResourceOverrides() 411 if err != nil { 412 return fx.Error(err) 413 } 414 415 // Migrate users of deprecated Experimental.ShardingEnabled flag 416 if cfg.Experimental.ShardingEnabled { 417 logger.Fatal("The `Experimental.ShardingEnabled` field is no longer used, please remove it from the config. Use Import.UnixFSHAMTDirectorySizeThreshold instead.") 418 } 419 if !cfg.Internal.UnixFSShardingSizeThreshold.IsDefault() { 420 msg := "The `Internal.UnixFSShardingSizeThreshold` field was renamed to `Import.UnixFSHAMTDirectorySizeThreshold`. Please update your config.\n" 421 if !cfg.Import.UnixFSHAMTDirectorySizeThreshold.IsDefault() { 422 logger.Fatal(msg) // conflicting values, hard fail 423 } 424 logger.Error(msg) 425 // Migrate the old OptionalString value to the new OptionalBytes field. 426 // Since OptionalBytes embeds OptionalString, we can construct it directly 427 // with the old value, preserving the user's original string (e.g., "256KiB"). 428 cfg.Import.UnixFSHAMTDirectorySizeThreshold = config.OptionalBytes{OptionalString: *cfg.Internal.UnixFSShardingSizeThreshold} 429 } 430 431 // Validate Import configuration 432 if err := config.ValidateImportConfig(&cfg.Import); err != nil { 433 return fx.Error(err) 434 } 435 436 // Validate Provide configuration 437 if err := config.ValidateProvideConfig(&cfg.Provide); err != nil { 438 return fx.Error(err) 439 } 440 441 // Directory sharding settings from Import config. 442 // These globals affect both `ipfs add` and MFS (`ipfs files` API). 443 shardSizeThreshold := cfg.Import.UnixFSHAMTDirectorySizeThreshold.WithDefault(config.DefaultUnixFSHAMTDirectorySizeThreshold) 444 shardMaxFanout := cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout) 445 uio.HAMTShardingSize = int(shardSizeThreshold) 446 uio.DefaultShardWidth = int(shardMaxFanout) 447 uio.HAMTSizeEstimation = cfg.Import.HAMTSizeEstimationMode() 448 449 providerStrategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) 450 451 return fx.Options( 452 bcfgOpts, 453 454 Storage(bcfg, cfg), 455 Identity(cfg), 456 IPNS, 457 Networked(bcfg, cfg, userResourceOverrides), 458 fx.Provide(BlockService(cfg)), 459 fx.Provide(Pinning(providerStrategy)), 460 fx.Provide(Files(providerStrategy)), 461 Core, 462 ) 463 }