/ core / node / storage.go
storage.go
 1  package node
 2  
 3  import (
 4  	blockstore "github.com/ipfs/boxo/blockstore"
 5  	"github.com/ipfs/go-datastore"
 6  	config "github.com/ipfs/kubo/config"
 7  	"go.uber.org/fx"
 8  
 9  	"github.com/ipfs/boxo/filestore"
10  	"github.com/ipfs/kubo/core/node/helpers"
11  	"github.com/ipfs/kubo/repo"
12  	"github.com/ipfs/kubo/thirdparty/verifbs"
13  )
14  
15  // RepoConfig loads configuration from the repo
16  func RepoConfig(repo repo.Repo) (*config.Config, error) {
17  	cfg, err := repo.Config()
18  	return cfg, err
19  }
20  
21  // Datastore provides the datastore
22  func Datastore(repo repo.Repo) datastore.Datastore {
23  	return repo.Datastore()
24  }
25  
26  // BaseBlocks is the lower level blockstore without GC or Filestore layers
27  type BaseBlocks blockstore.Blockstore
28  
29  // BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
30  func BaseBlockstoreCtor(
31  	cacheOpts blockstore.CacheOpts,
32  	hashOnRead bool,
33  	writeThrough bool,
34  	providingStrategy string,
35  ) func(mctx helpers.MetricsCtx, repo repo.Repo, prov DHTProvider, lc fx.Lifecycle) (bs BaseBlocks, err error) {
36  	return func(mctx helpers.MetricsCtx, repo repo.Repo, prov DHTProvider, lc fx.Lifecycle) (bs BaseBlocks, err error) {
37  		opts := []blockstore.Option{blockstore.WriteThrough(writeThrough)}
38  
39  		// Blockstore providing integration:
40  		// When strategy includes "all" the blockstore directly provides blocks as they're Put.
41  		// Important: Provide calls from blockstore are intentionally BLOCKING.
42  		// The Provider implementation (not the blockstore) should handle concurrency/queuing.
43  		// This avoids spawning unbounded goroutines for concurrent block additions.
44  		strategyFlag := config.ParseProvideStrategy(providingStrategy)
45  		if strategyFlag&config.ProvideStrategyAll != 0 {
46  			opts = append(opts, blockstore.Provider(prov))
47  		}
48  
49  		// hash security
50  		bs = blockstore.NewBlockstore(
51  			repo.Datastore(),
52  			opts...,
53  		)
54  		bs = &verifbs.VerifBS{Blockstore: bs}
55  		bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
56  		if err != nil {
57  			return nil, err
58  		}
59  
60  		bs = blockstore.NewIdStore(bs)
61  
62  		if hashOnRead {
63  			bs = &blockstore.ValidatingBlockstore{Blockstore: bs}
64  		}
65  
66  		return
67  	}
68  }
69  
70  // GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers
71  func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore) {
72  	gclocker = blockstore.NewGCLocker()
73  	gcbs = blockstore.NewGCBlockstore(bb, gclocker)
74  
75  	bs = gcbs
76  	return
77  }
78  
79  // FilestoreBlockstoreCtor wraps GcBlockstore and adds Filestore support
80  func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks, prov DHTProvider) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
81  	gclocker = blockstore.NewGCLocker()
82  
83  	// hash security
84  	fstore = filestore.NewFilestore(bb, repo.FileManager(), prov)
85  	gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
86  	gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
87  
88  	bs = gcbs
89  	return
90  }