provider.go
1 package node 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "os" 8 "path/filepath" 9 "time" 10 11 "github.com/ipfs/boxo/blockstore" 12 "github.com/ipfs/boxo/fetcher" 13 "github.com/ipfs/boxo/mfs" 14 pin "github.com/ipfs/boxo/pinning/pinner" 15 "github.com/ipfs/boxo/pinning/pinner/dspinner" 16 "github.com/ipfs/boxo/provider" 17 "github.com/ipfs/go-cid" 18 "github.com/ipfs/go-datastore" 19 "github.com/ipfs/go-datastore/mount" 20 "github.com/ipfs/go-datastore/namespace" 21 "github.com/ipfs/go-datastore/query" 22 log "github.com/ipfs/go-log/v2" 23 "github.com/ipfs/kubo/config" 24 "github.com/ipfs/kubo/repo" 25 "github.com/ipfs/kubo/repo/fsrepo" 26 irouting "github.com/ipfs/kubo/routing" 27 dht "github.com/libp2p/go-libp2p-kad-dht" 28 "github.com/libp2p/go-libp2p-kad-dht/amino" 29 "github.com/libp2p/go-libp2p-kad-dht/dual" 30 "github.com/libp2p/go-libp2p-kad-dht/fullrt" 31 dht_pb "github.com/libp2p/go-libp2p-kad-dht/pb" 32 dhtprovider "github.com/libp2p/go-libp2p-kad-dht/provider" 33 "github.com/libp2p/go-libp2p-kad-dht/provider/buffered" 34 ddhtprovider "github.com/libp2p/go-libp2p-kad-dht/provider/dual" 35 "github.com/libp2p/go-libp2p-kad-dht/provider/keystore" 36 routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" 37 "github.com/libp2p/go-libp2p/core/host" 38 peer "github.com/libp2p/go-libp2p/core/peer" 39 "github.com/libp2p/go-libp2p/core/routing" 40 ma "github.com/multiformats/go-multiaddr" 41 mh "github.com/multiformats/go-multihash" 42 "go.uber.org/fx" 43 ) 44 45 const ( 46 // The size of a batch that will be used for calculating average announcement 47 // time per CID, inside of boxo/provider.ThroughputReport 48 // and in 'ipfs stats provide' report. 49 // Used when Provide.DHT.SweepEnabled=false 50 sampledBatchSize = 1000 51 52 // Datastore key used to store previous reprovide strategy. 53 reprovideStrategyKey = "/reprovideStrategy" 54 55 // KeystoreDatastorePath is the base directory for the provider keystore datastores. 56 KeystoreDatastorePath = "provider-keystore" 57 ) 58 59 var ( 60 // Datastore namespace key for provider data. 61 providerDatastoreKey = datastore.NewKey("provider") 62 // Datastore namespace key for provider keystore data. 63 keystoreDatastoreKey = datastore.NewKey("keystore") 64 ) 65 66 var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready") 67 68 // validateKeystoreSuffix rejects any suffix other than "0" or "1". 69 // The upstream library uses these two values as alternating namespace 70 // identifiers. Validating here prevents accidental deletion of unrelated 71 // directories via os.RemoveAll if the upstream ever changes its scheme. 72 func validateKeystoreSuffix(suffix string) error { 73 if suffix != "0" && suffix != "1" { 74 return fmt.Errorf("unexpected keystore suffix %q, expected \"0\" or \"1\"", suffix) 75 } 76 return nil 77 } 78 79 // Interval between reprovide queue monitoring checks for slow reprovide alerts. 80 // Used when Provide.DHT.SweepEnabled=true 81 const reprovideAlertPollInterval = 15 * time.Minute 82 83 // Number of consecutive polling intervals with sustained queue growth before 84 // triggering a slow reprovide alert (3 intervals = 45 minutes). 85 // Used when Provide.DHT.SweepEnabled=true 86 const consecutiveAlertsThreshold = 3 87 88 // DHTProvider is an interface for providing keys to a DHT swarm. It holds a 89 // state of keys to be advertised, and is responsible for periodically 90 // publishing provider records for these keys to the DHT swarm before the 91 // records expire. 92 type DHTProvider interface { 93 // StartProviding ensures keys are periodically advertised to the DHT swarm. 94 // 95 // If the `keys` aren't currently being reprovided, they are added to the 96 // queue to be provided to the DHT swarm as soon as possible, and scheduled 97 // to be reprovided periodically. If `force` is set to true, all keys are 98 // provided to the DHT swarm, regardless of whether they were already being 99 // reprovided in the past. `keys` keep being reprovided until `StopProviding` 100 // is called. 101 // 102 // This operation is asynchronous, it returns as soon as the `keys` are added 103 // to the provide queue, and provides happens asynchronously. 104 // 105 // Returns an error if the keys couldn't be added to the provide queue. This 106 // can happen if the provider is closed or if the node is currently Offline 107 // (either never bootstrapped, or disconnected since more than `OfflineDelay`). 108 // The schedule and provide queue depend on the network size, hence recent 109 // network connectivity is essential. 110 StartProviding(force bool, keys ...mh.Multihash) error 111 // ProvideOnce sends provider records for the specified keys to the DHT swarm 112 // only once. It does not automatically reprovide those keys afterward. 113 // 114 // Add the supplied multihashes to the provide queue, and return immediately. 115 // The provide operation happens asynchronously. 116 // 117 // Returns an error if the keys couldn't be added to the provide queue. This 118 // can happen if the provider is closed or if the node is currently Offline 119 // (either never bootstrapped, or disconnected since more than `OfflineDelay`). 120 // The schedule and provide queue depend on the network size, hence recent 121 // network connectivity is essential. 122 ProvideOnce(keys ...mh.Multihash) error 123 // Clear clears the all the keys from the provide queue and returns the number 124 // of keys that were cleared. 125 // 126 // The keys are not deleted from the keystore, so they will continue to be 127 // reprovided as scheduled. 128 Clear() int 129 // RefreshSchedule scans the Keystore for any keys that are not currently 130 // scheduled for reproviding. If such keys are found, it schedules their 131 // associated keyspace region to be reprovided. 132 // 133 // This function doesn't remove prefixes that have no keys from the schedule. 134 // This is done automatically during the reprovide operation if a region has no 135 // keys. 136 // 137 // Returns an error if the provider is closed or if the node is currently 138 // Offline (either never bootstrapped, or disconnected since more than 139 // `OfflineDelay`). The schedule depends on the network size, hence recent 140 // network connectivity is essential. 141 RefreshSchedule() error 142 Close() error 143 } 144 145 var ( 146 _ DHTProvider = &ddhtprovider.SweepingProvider{} 147 _ DHTProvider = &dhtprovider.SweepingProvider{} 148 _ DHTProvider = &NoopProvider{} 149 _ DHTProvider = &LegacyProvider{} 150 ) 151 152 // NoopProvider is a no-operation provider implementation that does nothing. 153 // It is used when providing is disabled or when no DHT is available. 154 // All methods return successfully without performing any actual operations. 155 type NoopProvider struct{} 156 157 func (r *NoopProvider) StartProviding(bool, ...mh.Multihash) error { return nil } 158 func (r *NoopProvider) ProvideOnce(...mh.Multihash) error { return nil } 159 func (r *NoopProvider) Clear() int { return 0 } 160 func (r *NoopProvider) RefreshSchedule() error { return nil } 161 func (r *NoopProvider) Close() error { return nil } 162 163 // LegacyProvider is a wrapper around the boxo/provider.System that implements 164 // the DHTProvider interface. This provider manages reprovides using a burst 165 // strategy where it sequentially reprovides all keys at once during each 166 // reprovide interval, rather than spreading the load over time. 167 // 168 // This is the legacy provider implementation that can cause resource spikes 169 // during reprovide operations. For more efficient providing, consider using 170 // the SweepingProvider which spreads the load over the reprovide interval. 171 type LegacyProvider struct { 172 provider.System 173 } 174 175 func (r *LegacyProvider) StartProviding(force bool, keys ...mh.Multihash) error { 176 return r.ProvideOnce(keys...) 177 } 178 179 func (r *LegacyProvider) ProvideOnce(keys ...mh.Multihash) error { 180 if many, ok := r.System.(routinghelpers.ProvideManyRouter); ok { 181 return many.ProvideMany(context.Background(), keys) 182 } 183 184 for _, k := range keys { 185 if err := r.Provide(context.Background(), cid.NewCidV1(cid.Raw, k), true); err != nil { 186 return err 187 } 188 } 189 return nil 190 } 191 192 func (r *LegacyProvider) Clear() int { 193 return r.System.Clear() 194 } 195 196 func (r *LegacyProvider) RefreshSchedule() error { return nil } 197 198 // LegacyProviderOpt creates a LegacyProvider to be used as provider in the 199 // IpfsNode 200 func LegacyProviderOpt(reprovideInterval time.Duration, strategy string, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { 201 system := fx.Provide( 202 fx.Annotate(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (*LegacyProvider, error) { 203 // Initialize provider.System first, before pinner/blockstore/etc. 204 // The KeyChanFunc will be set later via SetKeyProvider() once we have 205 // created the pinner, blockstore and other dependencies. 206 opts := []provider.Option{ 207 provider.Online(cr), 208 provider.ReproviderInterval(reprovideInterval), 209 provider.ProvideWorkerCount(provideWorkerCount), 210 } 211 if !acceleratedDHTClient && reprovideInterval > 0 { 212 // The estimation kinda suck if you are running with accelerated DHT client, 213 // given this message is just trying to push people to use the acceleratedDHTClient 214 // let's not report on through if it's in use 215 opts = append(opts, 216 provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool { 217 avgProvideSpeed := duration / time.Duration(keysProvided) 218 count := uint64(keysProvided) 219 220 if !reprovide || !complete { 221 // We don't know how many CIDs we have to provide, try to fetch it from the blockstore. 222 // But don't try for too long as this might be very expensive if you have a huge datastore. 223 ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) 224 defer cancel() 225 226 // FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup. 227 // Note: talk to datastore directly, as to not depend on Blockstore here. 228 qr, err := repo.Datastore().Query(ctx, query.Query{ 229 Prefix: blockstore.BlockPrefix.String(), 230 KeysOnly: true, 231 }) 232 if err != nil { 233 logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err) 234 return false 235 } 236 defer qr.Close() 237 count = 0 238 countLoop: 239 for { 240 select { 241 case _, ok := <-qr.Next(): 242 if !ok { 243 break countLoop 244 } 245 count++ 246 case <-ctx.Done(): 247 // really big blockstore mode 248 249 // how many blocks would be in a 10TiB blockstore with 128KiB blocks. 250 const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024) 251 // How long per block that lasts us. 252 expectedProvideSpeed := reprovideInterval / probableBigBlockstore 253 if avgProvideSpeed > expectedProvideSpeed { 254 logger.Errorf(` 255 🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔 256 257 Your node may be falling behind on DHT reprovides, which could affect content availability. 258 259 Observed: %d keys at %v per key 260 Estimated: Assuming 10TiB blockstore, would take %v to complete 261 ⏰ Must finish within %v (Provide.DHT.Interval) 262 263 Solutions (try in order): 264 1. Enable Provide.DHT.SweepEnabled=true (recommended) 265 2. Increase Provide.DHT.MaxWorkers if needed 266 3. Enable Routing.AcceleratedDHTClient=true (last resort, resource intensive) 267 268 Learn more: https://github.com/ipfs/kubo/blob/master/docs/config.md#provide`, 269 keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval) 270 return false 271 } 272 } 273 } 274 } 275 276 // How long per block that lasts us. 277 expectedProvideSpeed := reprovideInterval 278 if count > 0 { 279 expectedProvideSpeed = reprovideInterval / time.Duration(count) 280 } 281 282 if avgProvideSpeed > expectedProvideSpeed { 283 logger.Errorf(` 284 🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔 285 286 Your node is falling behind on DHT reprovides, which will affect content availability. 287 288 Observed: %d keys at %v per key 289 Confirmed: ~%d total CIDs requiring %v to complete 290 ⏰ Must finish within %v (Provide.DHT.Interval) 291 292 Solutions (try in order): 293 1. Enable Provide.DHT.SweepEnabled=true (recommended) 294 2. Increase Provide.DHT.MaxWorkers if needed 295 3. Enable Routing.AcceleratedDHTClient=true (last resort, resource intensive) 296 297 Learn more: https://github.com/ipfs/kubo/blob/master/docs/config.md#provide`, 298 keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval) 299 } 300 return false 301 }, sampledBatchSize)) 302 } 303 304 sys, err := provider.New(repo.Datastore(), opts...) 305 if err != nil { 306 return nil, err 307 } 308 lc.Append(fx.Hook{ 309 OnStop: func(ctx context.Context) error { 310 return sys.Close() 311 }, 312 }) 313 314 prov := &LegacyProvider{sys} 315 handleStrategyChange(strategy, prov, repo.Datastore()) 316 317 return prov, nil 318 }, 319 fx.As(new(provider.System)), 320 fx.As(new(DHTProvider)), 321 ), 322 ) 323 setKeyProvider := fx.Invoke(func(lc fx.Lifecycle, system provider.System, keyProvider provider.KeyChanFunc) { 324 lc.Append(fx.Hook{ 325 OnStart: func(ctx context.Context) error { 326 // SetKeyProvider breaks the circular dependency between provider, blockstore, and pinner. 327 // We cannot create the blockstore without the provider (it needs to provide blocks), 328 // and we cannot determine the reproviding strategy without the pinner/blockstore. 329 // This deferred initialization allows us to create provider.System first, 330 // then set the actual key provider function after all dependencies are ready. 331 system.SetKeyProvider(keyProvider) 332 return nil 333 }, 334 }) 335 }) 336 return fx.Options( 337 system, 338 setKeyProvider, 339 ) 340 } 341 342 type dhtImpl interface { 343 routing.Routing 344 GetClosestPeers(context.Context, string) ([]peer.ID, error) 345 Host() host.Host 346 MessageSender() dht_pb.MessageSender 347 } 348 349 type fullrtRouter struct { 350 *fullrt.FullRT 351 ready bool 352 logger *log.ZapEventLogger 353 } 354 355 func newFullRTRouter(fr *fullrt.FullRT, loggerName string) *fullrtRouter { 356 return &fullrtRouter{ 357 FullRT: fr, 358 ready: true, 359 logger: log.Logger(loggerName), 360 } 361 } 362 363 // GetClosestPeers overrides fullrt.FullRT's GetClosestPeers and returns an 364 // error if the fullrt's initial network crawl isn't complete yet. 365 func (fr *fullrtRouter) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { 366 if fr.ready { 367 if !fr.Ready() { 368 fr.ready = false 369 fr.logger.Info("AcceleratedDHTClient: waiting for routing table initialization (5-10 min, depends on DHT size and network) to complete before providing") 370 return nil, errAcceleratedDHTNotReady 371 } 372 } else { 373 if fr.Ready() { 374 fr.ready = true 375 fr.logger.Info("AcceleratedDHTClient: routing table ready, providing can begin") 376 } else { 377 return nil, errAcceleratedDHTNotReady 378 } 379 } 380 return fr.FullRT.GetClosestPeers(ctx, key) 381 } 382 383 var ( 384 _ dhtImpl = &dht.IpfsDHT{} 385 _ dhtImpl = &fullrtRouter{} 386 ) 387 388 type addrsFilter interface { 389 FilteredAddrs() []ma.Multiaddr 390 } 391 392 // findRootDatastoreSpec extracts the leaf datastore spec for the root ("/") 393 // mount from the repo's Datastore.Spec config. It unwraps mount (picks the "/" 394 // mountpoint), measure, and log wrappers to find the actual backend spec 395 // (e.g., levelds, pebbleds). 396 func findRootDatastoreSpec(spec map[string]any) map[string]any { 397 if spec == nil { 398 return nil 399 } 400 switch spec["type"] { 401 case "mount": 402 mounts, ok := spec["mounts"].([]any) 403 if !ok { 404 return spec 405 } 406 for _, m := range mounts { 407 mnt, ok := m.(map[string]any) 408 if !ok { 409 continue 410 } 411 if mnt["mountpoint"] == "/" { 412 return findRootDatastoreSpec(mnt) 413 } 414 } 415 // No root mount found; return nil so callers fall back gracefully 416 // (in-memory datastore or skip mounting) rather than passing a 417 // mount-type spec to openDatastoreAt which expects a leaf backend. 418 return nil 419 case "measure", "log": 420 if child, ok := spec["child"].(map[string]any); ok { 421 return findRootDatastoreSpec(child) 422 } 423 return spec 424 default: 425 if _, hasChild := spec["child"]; hasChild { 426 logger.Warnw("unrecognized datastore wrapper type, using as-is", 427 "type", spec["type"]) 428 } 429 return spec 430 } 431 } 432 433 // MountKeystoreDatastores opens any provider keystore datastores that exist on 434 // disk and returns them as mount.Mount entries ready to be combined with the 435 // main repo datastore. The caller must call the returned cleanup function when 436 // done. Returns nil mounts and a no-op closer if no keystores exist. 437 func MountKeystoreDatastores(repo repo.Repo) ([]mount.Mount, func(), error) { 438 cfg, err := repo.Config() 439 if err != nil { 440 return nil, nil, fmt.Errorf("reading repo config: %w", err) 441 } 442 443 rootSpec := findRootDatastoreSpec(cfg.Datastore.Spec) 444 if rootSpec == nil { 445 return nil, func() {}, nil 446 } 447 448 keystoreBasePath := filepath.Join(repo.Path(), KeystoreDatastorePath) 449 var mounts []mount.Mount 450 var closers []func() 451 452 for _, suffix := range []string{"0", "1"} { 453 dir := filepath.Join(keystoreBasePath, suffix) 454 if _, err := os.Stat(dir); err != nil { 455 continue 456 } 457 ds, err := openDatastoreAt(rootSpec, dir) 458 if err != nil { 459 for _, c := range closers { 460 c() 461 } 462 return nil, nil, err 463 } 464 prefix := providerDatastoreKey.Child(keystoreDatastoreKey).ChildString(suffix) 465 mounts = append(mounts, mount.Mount{Prefix: prefix, Datastore: ds}) 466 closers = append(closers, func() { ds.Close() }) 467 } 468 469 closer := func() { 470 for _, c := range closers { 471 c() 472 } 473 } 474 return mounts, closer, nil 475 } 476 477 // openDatastoreAt opens a datastore using the given spec at the specified path. 478 // It deep-copies the spec to avoid mutating the original. 479 func openDatastoreAt(rootSpec map[string]any, path string) (datastore.Batching, error) { 480 spec := copySpec(rootSpec) 481 spec["path"] = path 482 dsc, err := fsrepo.AnyDatastoreConfig(spec) 483 if err != nil { 484 return nil, fmt.Errorf("creating datastore config for %s: %w", path, err) 485 } 486 return dsc.Create("") 487 } 488 489 // copySpec deep-copies a datastore spec map so modifications (e.g., changing 490 // the path) don't affect the original. 491 func copySpec(spec map[string]any) map[string]any { 492 if spec == nil { 493 return nil 494 } 495 cp := make(map[string]any, len(spec)) 496 for k, v := range spec { 497 switch val := v.(type) { 498 case map[string]any: 499 cp[k] = copySpec(val) 500 case []any: 501 s := make([]any, len(val)) 502 for i, elem := range val { 503 if m, ok := elem.(map[string]any); ok { 504 s[i] = copySpec(m) 505 } else { 506 s[i] = elem 507 } 508 } 509 cp[k] = s 510 default: 511 cp[k] = v 512 } 513 } 514 return cp 515 } 516 517 // purgeBatchSize is the number of keys deleted per batch commit during 518 // orphaned keystore cleanup. Each commit is a cancellation checkpoint. 519 const purgeBatchSize = 1 << 12 // 4096 520 521 // purgeOrphanedKeystoreData deletes all keys under /provider/keystore/ from the 522 // shared repo datastore. These were written by older Kubo versions that stored 523 // provider keystore data inline in the shared datastore. The new code uses 524 // separate filesystem datastores under <repo>/{KeystoreDatastorePath}/ instead. 525 // 526 // The operation is idempotent and safe to interrupt: partial completion is 527 // fine because already-deleted keys are no-ops on re-run. 528 func purgeOrphanedKeystoreData(ctx context.Context, ds datastore.Batching) error { 529 orphanedPrefix := providerDatastoreKey.Child(keystoreDatastoreKey).String() 530 syncKey := datastore.NewKey(orphanedPrefix) 531 532 results, err := ds.Query(ctx, query.Query{ 533 Prefix: orphanedPrefix, 534 KeysOnly: true, 535 }) 536 if err != nil { 537 return fmt.Errorf("querying orphaned keystore data: %w", err) 538 } 539 defer results.Close() 540 541 var batch datastore.Batch 542 var count, pending int 543 for result := range results.Next() { 544 if ctx.Err() != nil { 545 return ctx.Err() 546 } 547 if result.Error != nil { 548 return fmt.Errorf("iterating orphaned keystore data: %w", result.Error) 549 } 550 if batch == nil { 551 batch, err = ds.Batch(ctx) 552 if err != nil { 553 return fmt.Errorf("creating batch for orphaned keystore cleanup: %w", err) 554 } 555 } 556 if err := batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil { 557 return fmt.Errorf("batch deleting orphaned key %s: %w", result.Key, err) 558 } 559 count++ 560 pending++ 561 if pending >= purgeBatchSize { 562 if err := batch.Commit(ctx); err != nil { 563 return fmt.Errorf("committing orphaned keystore cleanup batch: %w", err) 564 } 565 if err := ds.Sync(ctx, syncKey); err != nil { 566 return fmt.Errorf("syncing orphaned keystore cleanup: %w", err) 567 } 568 batch = nil 569 pending = 0 570 } 571 } 572 if pending > 0 { 573 if err := batch.Commit(ctx); err != nil { 574 return fmt.Errorf("committing orphaned keystore cleanup batch: %w", err) 575 } 576 if err := ds.Sync(ctx, syncKey); err != nil { 577 return fmt.Errorf("syncing orphaned keystore cleanup: %w", err) 578 } 579 } 580 if count > 0 { 581 logger.Infow("purged orphaned provider keystore data from shared datastore", "keys", count) 582 } 583 return nil 584 } 585 586 func SweepingProviderOpt(cfg *config.Config) fx.Option { 587 reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) 588 type providerInput struct { 589 fx.In 590 DHT routing.Routing `name:"dhtc"` 591 Repo repo.Repo 592 Lc fx.Lifecycle 593 } 594 sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *keystore.ResettableKeystore, error) { 595 ds := namespace.Wrap(in.Repo.Datastore(), providerDatastoreKey) 596 597 // Get repo path and config to determine datastore type 598 repoPath := in.Repo.Path() 599 repoCfg, err := in.Repo.Config() 600 if err != nil { 601 return nil, nil, fmt.Errorf("getting repo config: %w", err) 602 } 603 604 // Find the root datastore type (levelds, pebbleds, etc.) 605 rootSpec := findRootDatastoreSpec(repoCfg.Datastore.Spec) 606 607 // Keystore datastores live at <repo>/{KeystoreDatastorePath}/<suffix> 608 keystoreBasePath := filepath.Join(repoPath, KeystoreDatastorePath) 609 610 createDs := func(suffix string) (datastore.Batching, error) { 611 if err := validateKeystoreSuffix(suffix); err != nil { 612 return nil, err 613 } 614 // When no datastore spec is configured (e.g., test/mock repos), 615 // fall back to an in-memory datastore. 616 if rootSpec == nil { 617 return datastore.NewMapDatastore(), nil 618 } 619 if err := os.MkdirAll(keystoreBasePath, 0o755); err != nil { 620 return nil, fmt.Errorf("creating keystore base directory: %w", err) 621 } 622 ds, err := openDatastoreAt(rootSpec, filepath.Join(keystoreBasePath, suffix)) 623 if err != nil { 624 return nil, err 625 } 626 logger.Infow("provider keystore: opened datastore", "suffix", suffix, "path", filepath.Join(keystoreBasePath, suffix)) 627 return ds, nil 628 } 629 630 destroyDs := func(suffix string) error { 631 if err := validateKeystoreSuffix(suffix); err != nil { 632 return err 633 } 634 logger.Infow("provider keystore: removing datastore from disk", "suffix", suffix, "path", filepath.Join(keystoreBasePath, suffix)) 635 return os.RemoveAll(filepath.Join(keystoreBasePath, suffix)) 636 } 637 638 // One-time cleanup of stale keystore data left by older Kubo in the 639 // shared repo datastore under /provider/keystore/. New code stores 640 // bulk key data in separate filesystem datastores under 641 // <repo>/{KeystoreDatastorePath}/ while still using the same 642 // /provider/keystore/ namespace in the shared datastore for metadata. 643 // 644 // The absence of the keystoreBasePath directory signals a first run 645 // after upgrade: the directory is created later by createDs on first 646 // use, so it doubles as a "cleanup done" flag. If the process dies 647 // mid-purge the directory still won't exist and the cleanup re-runs 648 // on next start (it is idempotent). Must run synchronously before 649 // NewResettableKeystore to avoid racing with reads on the same 650 // namespace. 651 if _, statErr := os.Stat(keystoreBasePath); os.IsNotExist(statErr) { 652 logger.Infow("migrating provider keystore data from shared datastore to separate filesystem datastores", "path", keystoreBasePath) 653 // Create a cancellable context for the purge. The OnStop hook 654 // below calls purgeCancel when the node receives a shutdown 655 // signal (e.g., SIGINT), which interrupts the purge loop 656 // instead of blocking indefinitely. 657 purgeCtx, purgeCancel := context.WithCancel(context.Background()) 658 in.Lc.Append(fx.Hook{ 659 OnStop: func(_ context.Context) error { 660 purgeCancel() 661 return nil 662 }, 663 }) 664 if purgeErr := purgeOrphanedKeystoreData(purgeCtx, in.Repo.Datastore()); purgeErr != nil { 665 if purgeCtx.Err() != nil { 666 logger.Infow("provider keystore migration interrupted by shutdown, will resume on next start") 667 } else { 668 logger.Warnw("provider keystore migration failed, will retry on next start", "error", purgeErr) 669 } 670 } else { 671 logger.Infow("provider keystore migration completed") 672 } 673 purgeCancel() 674 } 675 676 keystoreDs := namespace.Wrap(ds, keystoreDatastoreKey) 677 ks, err := keystore.NewResettableKeystore(keystoreDs, 678 keystore.WithDatastoreFactory(createDs, destroyDs), 679 keystore.KeystoreOption( 680 keystore.WithPrefixBits(16), 681 keystore.WithBatchSize(int(cfg.Provide.DHT.KeystoreBatchSize.WithDefault(config.DefaultProvideDHTKeystoreBatchSize))), 682 ), 683 ) 684 if err != nil { 685 return nil, nil, err 686 } 687 // Constants for buffered provider configuration 688 // These values match the upstream defaults from go-libp2p-kad-dht and have been battle-tested 689 const ( 690 // bufferedDsName is the datastore namespace used by the buffered provider. 691 // The dsqueue persists operations here to handle large data additions without 692 // being memory-bound, allowing operations on hardware with limited RAM and 693 // enabling core operations to return instantly while processing happens async. 694 bufferedDsName = "bprov" 695 696 // bufferedBatchSize controls how many operations are dequeued and processed 697 // together from the datastore queue. The worker processes up to this many 698 // operations at once, grouping them by type for efficiency. 699 bufferedBatchSize = 1 << 10 // 1024 items 700 701 // bufferedIdleWriteTime is an implementation detail of go-dsqueue that controls 702 // how long the datastore buffer waits for new multihashes to arrive before 703 // flushing in-memory items to the datastore. This does NOT affect providing speed - 704 // provides happen as fast as possible via a dedicated worker that continuously 705 // processes the queue regardless of this timing. 706 bufferedIdleWriteTime = time.Minute 707 708 // loggerName is the name of the go-log logger used by the provider. 709 loggerName = dhtprovider.DefaultLoggerName 710 ) 711 712 bufferedProviderOpts := []buffered.Option{ 713 buffered.WithBatchSize(bufferedBatchSize), 714 buffered.WithDsName(bufferedDsName), 715 buffered.WithIdleWriteTime(bufferedIdleWriteTime), 716 } 717 var impl dhtImpl 718 switch inDht := in.DHT.(type) { 719 case *dht.IpfsDHT: 720 if inDht != nil { 721 impl = inDht 722 } 723 case *dual.DHT: 724 if inDht != nil { 725 prov, err := ddhtprovider.New(inDht, 726 ddhtprovider.WithKeystore(ks), 727 ddhtprovider.WithDatastore(ds), 728 ddhtprovider.WithResumeCycle(cfg.Provide.DHT.ResumeEnabled.WithDefault(config.DefaultProvideDHTResumeEnabled)), 729 730 ddhtprovider.WithReprovideInterval(reprovideInterval), 731 ddhtprovider.WithMaxReprovideDelay(time.Hour), 732 ddhtprovider.WithOfflineDelay(cfg.Provide.DHT.OfflineDelay.WithDefault(config.DefaultProvideDHTOfflineDelay)), 733 ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute), 734 735 ddhtprovider.WithMaxWorkers(int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))), 736 ddhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))), 737 ddhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))), 738 ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))), 739 740 ddhtprovider.WithLoggerName(loggerName), 741 ) 742 if err != nil { 743 return nil, nil, err 744 } 745 return buffered.New(prov, ds, bufferedProviderOpts...), ks, nil 746 } 747 case *fullrt.FullRT: 748 if inDht != nil { 749 impl = newFullRTRouter(inDht, loggerName) 750 } 751 } 752 if impl == nil { 753 return &NoopProvider{}, nil, nil 754 } 755 756 var selfAddrsFunc func() []ma.Multiaddr 757 if imlpFilter, ok := impl.(addrsFilter); ok { 758 selfAddrsFunc = imlpFilter.FilteredAddrs 759 } else { 760 selfAddrsFunc = func() []ma.Multiaddr { return impl.Host().Addrs() } 761 } 762 opts := []dhtprovider.Option{ 763 dhtprovider.WithKeystore(ks), 764 dhtprovider.WithDatastore(ds), 765 dhtprovider.WithResumeCycle(cfg.Provide.DHT.ResumeEnabled.WithDefault(config.DefaultProvideDHTResumeEnabled)), 766 dhtprovider.WithHost(impl.Host()), 767 dhtprovider.WithRouter(impl), 768 dhtprovider.WithMessageSender(impl.MessageSender()), 769 dhtprovider.WithSelfAddrs(selfAddrsFunc), 770 dhtprovider.WithAddLocalRecord(func(h mh.Multihash) error { 771 return impl.Provide(context.Background(), cid.NewCidV1(cid.Raw, h), false) 772 }), 773 774 dhtprovider.WithReplicationFactor(amino.DefaultBucketSize), 775 dhtprovider.WithReprovideInterval(reprovideInterval), 776 dhtprovider.WithMaxReprovideDelay(time.Hour), 777 dhtprovider.WithOfflineDelay(cfg.Provide.DHT.OfflineDelay.WithDefault(config.DefaultProvideDHTOfflineDelay)), 778 dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute), 779 780 dhtprovider.WithMaxWorkers(int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))), 781 dhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))), 782 dhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))), 783 dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))), 784 785 dhtprovider.WithLoggerName(loggerName), 786 } 787 788 prov, err := dhtprovider.New(opts...) 789 if err != nil { 790 return nil, nil, err 791 } 792 return buffered.New(prov, ds, bufferedProviderOpts...), ks, nil 793 }) 794 795 type keystoreInput struct { 796 fx.In 797 Provider DHTProvider 798 Keystore *keystore.ResettableKeystore 799 KeyProvider provider.KeyChanFunc 800 } 801 initKeystore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) { 802 // Skip keystore initialization for NoopProvider 803 if _, ok := in.Provider.(*NoopProvider); ok { 804 return 805 } 806 807 var ( 808 cancel context.CancelFunc 809 done = make(chan struct{}) 810 ) 811 812 syncKeystore := func(ctx context.Context) error { 813 kcf, err := in.KeyProvider(ctx) 814 if err != nil { 815 return err 816 } 817 if err := in.Keystore.ResetCids(ctx, kcf); err != nil { 818 return err 819 } 820 if err := in.Provider.RefreshSchedule(); err != nil { 821 logger.Infow("refreshing provider schedule", "err", err) 822 } 823 return nil 824 } 825 826 lc.Append(fx.Hook{ 827 OnStart: func(ctx context.Context) error { 828 // Set the KeyProvider as a garbage collection function for the 829 // keystore. Periodically purge the Keystore from all its keys and 830 // replace them with the keys that needs to be reprovided, coming from 831 // the KeyChanFunc. So far, this is the less worse way to remove CIDs 832 // that shouldn't be reprovided from the provider's state. 833 go func() { 834 // Sync the keystore once at startup. This operation is async since 835 // we need to walk the DAG of objects matching the provide strategy, 836 // which can take a while. 837 strategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) 838 logger.Infow("provider keystore sync started", "strategy", strategy) 839 if err := syncKeystore(ctx); err != nil { 840 if ctx.Err() == nil { 841 logger.Errorw("provider keystore sync failed", "err", err, "strategy", strategy) 842 } else { 843 logger.Debugw("provider keystore sync interrupted by shutdown", "err", err, "strategy", strategy) 844 } 845 return 846 } 847 logger.Infow("provider keystore sync completed", "strategy", strategy) 848 }() 849 850 gcCtx, c := context.WithCancel(context.Background()) 851 cancel = c 852 853 go func() { // garbage collection loop for cids to reprovide 854 defer close(done) 855 ticker := time.NewTicker(reprovideInterval) 856 defer ticker.Stop() 857 858 for { 859 select { 860 case <-gcCtx.Done(): 861 return 862 case <-ticker.C: 863 if err := syncKeystore(gcCtx); err != nil { 864 logger.Errorw("provider keystore sync", "err", err) 865 } 866 } 867 } 868 }() 869 return nil 870 }, 871 OnStop: func(ctx context.Context) error { 872 if cancel != nil { 873 cancel() 874 } 875 select { 876 case <-done: 877 case <-ctx.Done(): 878 return ctx.Err() 879 } 880 // Keystore will be closed by ensureProviderClosesBeforeKeystore hook 881 // to guarantee provider closes before keystore. 882 return nil 883 }, 884 }) 885 }) 886 887 // ensureProviderClosesBeforeKeystore manages the shutdown order between 888 // provider and keystore to prevent race conditions. 889 // 890 // The provider's worker goroutines may call keystore methods during their 891 // operation. If keystore closes while these operations are in-flight, we get 892 // "keystore is closed" errors. By closing the provider first, we ensure all 893 // worker goroutines exit and complete any pending keystore operations before 894 // the keystore itself closes. 895 type providerKeystoreShutdownInput struct { 896 fx.In 897 Provider DHTProvider 898 Keystore *keystore.ResettableKeystore 899 } 900 ensureProviderClosesBeforeKeystore := fx.Invoke(func(lc fx.Lifecycle, in providerKeystoreShutdownInput) { 901 // Skip for NoopProvider 902 if _, ok := in.Provider.(*NoopProvider); ok { 903 return 904 } 905 906 lc.Append(fx.Hook{ 907 OnStop: func(ctx context.Context) error { 908 // Close provider first - waits for all worker goroutines to exit. 909 // This ensures no code can access keystore after this returns. 910 if err := in.Provider.Close(); err != nil { 911 logger.Errorw("error closing provider during shutdown", "error", err) 912 } 913 914 // Close keystore - safe now, provider is fully shut down 915 return in.Keystore.Close() 916 }, 917 }) 918 }) 919 920 // extractSweepingProvider extracts a SweepingProvider from the given provider interface. 921 // It handles unwrapping buffered and dual providers, always selecting WAN for dual DHT. 922 // Returns nil if the provider is not a sweeping provider type. 923 var extractSweepingProvider func(prov any) *dhtprovider.SweepingProvider 924 extractSweepingProvider = func(prov any) *dhtprovider.SweepingProvider { 925 switch p := prov.(type) { 926 case *dhtprovider.SweepingProvider: 927 return p 928 case *ddhtprovider.SweepingProvider: 929 return p.WAN 930 case *buffered.SweepingProvider: 931 // Recursively extract from the inner provider 932 return extractSweepingProvider(p.Provider) 933 default: 934 return nil 935 } 936 } 937 938 type alertInput struct { 939 fx.In 940 Provider DHTProvider 941 } 942 reprovideAlert := fx.Invoke(func(lc fx.Lifecycle, in alertInput) { 943 prov := extractSweepingProvider(in.Provider) 944 if prov == nil { 945 return 946 } 947 948 var ( 949 cancel context.CancelFunc 950 done = make(chan struct{}) 951 ) 952 953 lc.Append(fx.Hook{ 954 OnStart: func(ctx context.Context) error { 955 gcCtx, c := context.WithCancel(context.Background()) 956 cancel = c 957 go func() { 958 defer close(done) 959 960 ticker := time.NewTicker(reprovideAlertPollInterval) 961 defer ticker.Stop() 962 963 var ( 964 queueSize, prevQueueSize int64 965 queuedWorkers, prevQueuedWorkers bool 966 count int 967 ) 968 969 for { 970 select { 971 case <-gcCtx.Done(): 972 return 973 case <-ticker.C: 974 } 975 976 stats := prov.Stats() 977 queuedWorkers = stats.Workers.QueuedPeriodic > 0 978 queueSize = int64(stats.Queues.PendingRegionReprovides) 979 980 // Alert if reprovide queue keeps growing and all periodic workers are busy. 981 // Requires consecutiveAlertsThreshold intervals of sustained growth. 982 if prevQueuedWorkers && queuedWorkers && queueSize > prevQueueSize { 983 count++ 984 if count >= consecutiveAlertsThreshold { 985 logger.Errorf(` 986 🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔 987 988 Your node is falling behind on DHT reprovides, which will affect content availability. 989 990 Keyspace regions enqueued for reprovide: 991 %s ago:\t%d 992 Now:\t%d 993 994 All periodic workers are busy! 995 Active workers:\t%d / %d (max) 996 Active workers types:\t%d periodic, %d burst 997 Dedicated workers:\t%d periodic, %d burst 998 999 Solutions (try in order): 1000 1. Increase Provide.DHT.MaxWorkers (current %d) 1001 2. Increase Provide.DHT.DedicatedPeriodicWorkers (current %d) 1002 3. Set Provide.DHT.SweepEnabled=false and Routing.AcceleratedDHTClient=true (last resort, not recommended) 1003 1004 See how the reprovide queue is processed in real-time with 'watch ipfs provide stat --all --compact' 1005 1006 See docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#providedhtmaxworkers`, 1007 reprovideAlertPollInterval.Truncate(time.Minute).String(), prevQueueSize, queueSize, 1008 stats.Workers.Active, stats.Workers.Max, 1009 stats.Workers.ActivePeriodic, stats.Workers.ActiveBurst, 1010 stats.Workers.DedicatedPeriodic, stats.Workers.DedicatedBurst, 1011 stats.Workers.Max, stats.Workers.DedicatedPeriodic) 1012 } 1013 } else if !queuedWorkers { 1014 count = 0 1015 } 1016 1017 prevQueueSize, prevQueuedWorkers = queueSize, queuedWorkers 1018 } 1019 }() 1020 return nil 1021 }, 1022 OnStop: func(ctx context.Context) error { 1023 // Cancel the alert loop 1024 if cancel != nil { 1025 cancel() 1026 } 1027 select { 1028 case <-done: 1029 case <-ctx.Done(): 1030 return ctx.Err() 1031 } 1032 return nil 1033 }, 1034 }) 1035 }) 1036 1037 return fx.Options( 1038 sweepingReprovider, 1039 initKeystore, 1040 ensureProviderClosesBeforeKeystore, 1041 reprovideAlert, 1042 ) 1043 } 1044 1045 // ONLINE/OFFLINE 1046 1047 // hasDHTRouting checks if the routing configuration includes a DHT component. 1048 // Returns false for HTTP-only custom routing configurations (e.g., Routing.Type="custom" 1049 // with only HTTP routers). This is used to determine whether SweepingProviderOpt 1050 // can be used, since it requires a DHT client. 1051 func hasDHTRouting(cfg *config.Config) bool { 1052 routingType := cfg.Routing.Type.WithDefault(config.DefaultRoutingType) 1053 switch routingType { 1054 case "auto", "autoclient", "dht", "dhtclient", "dhtserver": 1055 return true 1056 case "custom": 1057 // Check if any router in custom config is DHT-based 1058 for _, router := range cfg.Routing.Routers { 1059 if routerIncludesDHT(router, cfg) { 1060 return true 1061 } 1062 } 1063 return false 1064 default: // "none", "delegated" 1065 return false 1066 } 1067 } 1068 1069 // routerIncludesDHT recursively checks if a router configuration includes DHT. 1070 // Handles parallel and sequential composite routers by checking their children. 1071 func routerIncludesDHT(rp config.RouterParser, cfg *config.Config) bool { 1072 switch rp.Type { 1073 case config.RouterTypeDHT: 1074 return true 1075 case config.RouterTypeParallel, config.RouterTypeSequential: 1076 if children, ok := rp.Parameters.(*config.ComposableRouterParams); ok { 1077 for _, child := range children.Routers { 1078 if childRouter, exists := cfg.Routing.Routers[child.RouterName]; exists { 1079 if routerIncludesDHT(childRouter, cfg) { 1080 return true 1081 } 1082 } 1083 } 1084 } 1085 } 1086 return false 1087 } 1088 1089 // OnlineProviders groups units managing provide routing records online 1090 func OnlineProviders(provide bool, cfg *config.Config) fx.Option { 1091 if !provide { 1092 return OfflineProviders() 1093 } 1094 1095 providerStrategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) 1096 1097 strategyFlag := config.ParseProvideStrategy(providerStrategy) 1098 if strategyFlag == 0 { 1099 return fx.Error(fmt.Errorf("provider: unknown strategy %q", providerStrategy)) 1100 } 1101 1102 opts := []fx.Option{ 1103 fx.Provide(setReproviderKeyProvider(providerStrategy)), 1104 } 1105 1106 sweepEnabled := cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled) 1107 dhtAvailable := hasDHTRouting(cfg) 1108 1109 // Use SweepingProvider only when both sweep is enabled AND DHT is available. 1110 // For HTTP-only routing (e.g., Routing.Type="custom" with only HTTP routers), 1111 // fall back to LegacyProvider which works with ProvideManyRouter. 1112 // See https://github.com/ipfs/kubo/issues/11089 1113 if sweepEnabled && dhtAvailable { 1114 opts = append(opts, SweepingProviderOpt(cfg)) 1115 } else { 1116 reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) 1117 acceleratedDHTClient := cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient) 1118 provideWorkerCount := int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers)) 1119 1120 opts = append(opts, LegacyProviderOpt(reprovideInterval, providerStrategy, acceleratedDHTClient, provideWorkerCount)) 1121 } 1122 1123 return fx.Options(opts...) 1124 } 1125 1126 // OfflineProviders groups units managing provide routing records offline 1127 func OfflineProviders() fx.Option { 1128 return fx.Provide(func() DHTProvider { 1129 return &NoopProvider{} 1130 }) 1131 } 1132 1133 func mfsProvider(mfsRoot *mfs.Root, fetcher fetcher.Factory) provider.KeyChanFunc { 1134 return func(ctx context.Context) (<-chan cid.Cid, error) { 1135 err := mfsRoot.FlushMemFree(ctx) 1136 if err != nil { 1137 return nil, fmt.Errorf("provider: error flushing MFS, cannot provide MFS: %w", err) 1138 } 1139 rootNode, err := mfsRoot.GetDirectory().GetNode() 1140 if err != nil { 1141 return nil, fmt.Errorf("provider: error loading MFS root, cannot provide MFS: %w", err) 1142 } 1143 1144 kcf := provider.NewDAGProvider(rootNode.Cid(), fetcher) 1145 return kcf(ctx) 1146 } 1147 } 1148 1149 type provStrategyIn struct { 1150 fx.In 1151 Pinner pin.Pinner 1152 Blockstore blockstore.Blockstore 1153 OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` 1154 OfflineUnixFSFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` 1155 MFSRoot *mfs.Root 1156 Repo repo.Repo 1157 } 1158 1159 type provStrategyOut struct { 1160 fx.Out 1161 ProvidingStrategy config.ProvideStrategy 1162 ProvidingKeyChanFunc provider.KeyChanFunc 1163 } 1164 1165 // createKeyProvider creates the appropriate KeyChanFunc based on strategy. 1166 // Each strategy has different behavior: 1167 // - "roots": Only root CIDs of pinned content 1168 // - "pinned": All pinned content (roots + children) 1169 // - "mfs": Only MFS content 1170 // - "all": all blocks 1171 func createKeyProvider(strategyFlag config.ProvideStrategy, in provStrategyIn) provider.KeyChanFunc { 1172 switch strategyFlag { 1173 case config.ProvideStrategyRoots: 1174 return provider.NewBufferedProvider(dspinner.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher)) 1175 case config.ProvideStrategyPinned: 1176 return provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)) 1177 case config.ProvideStrategyPinned | config.ProvideStrategyMFS: 1178 return provider.NewPrioritizedProvider( 1179 provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)), 1180 mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher), 1181 ) 1182 case config.ProvideStrategyMFS: 1183 return mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher) 1184 default: // "all", "", "flat" (compat) 1185 return in.Blockstore.AllKeysChan 1186 } 1187 } 1188 1189 // detectStrategyChange checks if the reproviding strategy has changed from what's persisted. 1190 // Returns: (previousStrategy, hasChanged, error) 1191 func detectStrategyChange(ctx context.Context, strategy string, ds datastore.Datastore) (string, bool, error) { 1192 strategyKey := datastore.NewKey(reprovideStrategyKey) 1193 1194 prev, err := ds.Get(ctx, strategyKey) 1195 if err != nil { 1196 if errors.Is(err, datastore.ErrNotFound) { 1197 return "", strategy != "", nil 1198 } 1199 return "", false, err 1200 } 1201 1202 previousStrategy := string(prev) 1203 return previousStrategy, previousStrategy != strategy, nil 1204 } 1205 1206 // persistStrategy saves the current reproviding strategy to the datastore. 1207 // Empty string strategies are deleted rather than stored. 1208 func persistStrategy(ctx context.Context, strategy string, ds datastore.Datastore) error { 1209 strategyKey := datastore.NewKey(reprovideStrategyKey) 1210 1211 if strategy == "" { 1212 return ds.Delete(ctx, strategyKey) 1213 } 1214 return ds.Put(ctx, strategyKey, []byte(strategy)) 1215 } 1216 1217 // handleStrategyChange manages strategy change detection and queue clearing. 1218 // Strategy change detection: when the reproviding strategy changes, 1219 // we clear the provide queue to avoid unexpected behavior from mixing 1220 // strategies. This ensures a clean transition between different providing modes. 1221 func handleStrategyChange(strategy string, provider DHTProvider, ds datastore.Datastore) { 1222 ctx := context.Background() 1223 1224 previous, changed, err := detectStrategyChange(ctx, strategy, ds) 1225 if err != nil { 1226 logger.Error("cannot read previous reprovide strategy", "err", err) 1227 return 1228 } 1229 1230 if !changed { 1231 return 1232 } 1233 1234 logger.Infow("Provide.Strategy changed, clearing provide queue", "previous", previous, "current", strategy) 1235 provider.Clear() 1236 1237 if err := persistStrategy(ctx, strategy, ds); err != nil { 1238 logger.Error("cannot update reprovide strategy", "err", err) 1239 } 1240 } 1241 1242 func setReproviderKeyProvider(strategy string) func(in provStrategyIn) provStrategyOut { 1243 strategyFlag := config.ParseProvideStrategy(strategy) 1244 1245 return func(in provStrategyIn) provStrategyOut { 1246 // Create the appropriate key provider based on strategy 1247 kcf := createKeyProvider(strategyFlag, in) 1248 return provStrategyOut{ 1249 ProvidingStrategy: strategyFlag, 1250 ProvidingKeyChanFunc: kcf, 1251 } 1252 } 1253 }