dag.go
1 package coreapi 2 3 import ( 4 "context" 5 6 dag "github.com/ipfs/boxo/ipld/merkledag" 7 pin "github.com/ipfs/boxo/pinning/pinner" 8 cid "github.com/ipfs/go-cid" 9 ipld "github.com/ipfs/go-ipld-format" 10 "go.opentelemetry.io/otel/attribute" 11 "go.opentelemetry.io/otel/trace" 12 13 "github.com/ipfs/kubo/tracing" 14 ) 15 16 type dagAPI struct { 17 ipld.DAGService 18 19 core *CoreAPI 20 } 21 22 type pinningAdder CoreAPI 23 24 func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error { 25 ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "Add", trace.WithAttributes(attribute.String("node", nd.String()))) 26 defer span.End() 27 defer adder.blockstore.PinLock(ctx).Unlock(ctx) 28 29 if err := adder.dag.Add(ctx, nd); err != nil { 30 return err 31 } 32 33 if err := adder.pinning.PinWithMode(ctx, nd.Cid(), pin.Recursive, ""); err != nil { 34 return err 35 } 36 37 return adder.pinning.Flush(ctx) 38 } 39 40 func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error { 41 ctx, span := tracing.Span(ctx, "CoreAPI.PinningAdder", "AddMany", trace.WithAttributes(attribute.Int("nodes.count", len(nds)))) 42 defer span.End() 43 defer adder.blockstore.PinLock(ctx).Unlock(ctx) 44 45 if err := adder.dag.AddMany(ctx, nds); err != nil { 46 return err 47 } 48 49 cids := cid.NewSet() 50 51 for _, nd := range nds { 52 c := nd.Cid() 53 if cids.Visit(c) { 54 if err := adder.pinning.PinWithMode(ctx, c, pin.Recursive, ""); err != nil { 55 return err 56 } 57 } 58 } 59 60 return adder.pinning.Flush(ctx) 61 } 62 63 func (api *dagAPI) Pinning() ipld.NodeAdder { 64 return (*pinningAdder)(api.core) 65 } 66 67 func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter { 68 return dag.NewSession(ctx, api.DAGService) 69 } 70 71 var ( 72 _ ipld.DAGService = (*dagAPI)(nil) 73 _ dag.SessionMaker = (*dagAPI)(nil) 74 )