/ core / coreapi / dag.go
dag.go
 1  package coreapi
 2  
 3  import (
 4  	"context"
 5  
 6  	dag "github.com/ipfs/boxo/ipld/merkledag"
 7  	pin "github.com/ipfs/boxo/pinning/pinner"
 8  	cid "github.com/ipfs/go-cid"
 9  	ipld "github.com/ipfs/go-ipld-format"
10  	"go.opentelemetry.io/otel/attribute"
11  	"go.opentelemetry.io/otel/trace"
12  
13  	"github.com/ipfs/kubo/tracing"
14  )
15  
16  type dagAPI struct {
17  	ipld.DAGService
18  
19  	core *CoreAPI
20  }
21  
22  type pinningAdder CoreAPI
23  
24  func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {
25  	ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "Add", trace.WithAttributes(attribute.String("node", nd.String())))
26  	defer span.End()
27  	defer adder.blockstore.PinLock(ctx).Unlock(ctx)
28  
29  	if err := adder.dag.Add(ctx, nd); err != nil {
30  		return err
31  	}
32  
33  	if err := adder.pinning.PinWithMode(ctx, nd.Cid(), pin.Recursive, ""); err != nil {
34  		return err
35  	}
36  
37  	return adder.pinning.Flush(ctx)
38  }
39  
40  func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
41  	ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "AddMany", trace.WithAttributes(attribute.Int("nodes.count", len(nds))))
42  	defer span.End()
43  	defer adder.blockstore.PinLock(ctx).Unlock(ctx)
44  
45  	if err := adder.dag.AddMany(ctx, nds); err != nil {
46  		return err
47  	}
48  
49  	cids := cid.NewSet()
50  
51  	for _, nd := range nds {
52  		c := nd.Cid()
53  		if cids.Visit(c) {
54  			if err := adder.pinning.PinWithMode(ctx, c, pin.Recursive, ""); err != nil {
55  				return err
56  			}
57  		}
58  	}
59  
60  	return adder.pinning.Flush(ctx)
61  }
62  
63  func (api *dagAPI) Pinning() ipld.NodeAdder {
64  	return (*pinningAdder)(api.core)
65  }
66  
67  func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter {
68  	return dag.NewSession(ctx, api.DAGService)
69  }
70  
71  var (
72  	_ ipld.DAGService  = (*dagAPI)(nil)
73  	_ dag.SessionMaker = (*dagAPI)(nil)
74  )