/ client / rpc / dag.go
dag.go
  1  package rpc
  2  
  3  import (
  4  	"bytes"
  5  	"context"
  6  	"fmt"
  7  	"io"
  8  
  9  	"github.com/ipfs/boxo/path"
 10  	blocks "github.com/ipfs/go-block-format"
 11  	"github.com/ipfs/go-cid"
 12  	format "github.com/ipfs/go-ipld-format"
 13  	"github.com/ipfs/kubo/core/coreiface/options"
 14  	multicodec "github.com/multiformats/go-multicodec"
 15  )
 16  
 17  type (
 18  	httpNodeAdder        HttpApi
 19  	HttpDagServ          httpNodeAdder
 20  	pinningHttpNodeAdder httpNodeAdder
 21  )
 22  
 23  func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
 24  	r, err := api.core().Block().Get(ctx, path.FromCid(c))
 25  	if err != nil {
 26  		return nil, err
 27  	}
 28  
 29  	data, err := io.ReadAll(r)
 30  	if err != nil {
 31  		return nil, err
 32  	}
 33  
 34  	blk, err := blocks.NewBlockWithCid(data, c)
 35  	if err != nil {
 36  		return nil, err
 37  	}
 38  
 39  	return api.ipldDecoder.DecodeNode(ctx, blk)
 40  }
 41  
 42  func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption {
 43  	out := make(chan *format.NodeOption)
 44  
 45  	for _, c := range cids {
 46  		// TODO: Consider limiting concurrency of this somehow
 47  		go func(c cid.Cid) {
 48  			n, err := api.Get(ctx, c)
 49  
 50  			select {
 51  			case out <- &format.NodeOption{Node: n, Err: err}:
 52  			case <-ctx.Done():
 53  			}
 54  		}(c)
 55  	}
 56  	return out
 57  }
 58  
 59  func (api *httpNodeAdder) add(ctx context.Context, nd format.Node, pin bool) error {
 60  	c := nd.Cid()
 61  	prefix := c.Prefix()
 62  
 63  	// preserve 'cid-codec' when sent over HTTP
 64  	cidCodec := multicodec.Code(prefix.Codec).String()
 65  
 66  	// 'format' got replaced by 'cid-codec' in https://github.com/ipfs/interface-go-ipfs-core/pull/80
 67  	// but we still support it here for backward-compatibility with use of CIDv0
 68  	format := ""
 69  	if prefix.Version == 0 {
 70  		cidCodec = ""
 71  		format = "v0"
 72  	}
 73  
 74  	stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()),
 75  		options.Block.Hash(prefix.MhType, prefix.MhLength),
 76  		options.Block.CidCodec(cidCodec),
 77  		options.Block.Format(format),
 78  		options.Block.Pin(pin))
 79  	if err != nil {
 80  		return err
 81  	}
 82  	if !stat.Path().RootCid().Equals(c) {
 83  		return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().RootCid().String())
 84  	}
 85  	return nil
 86  }
 87  
 88  func (api *httpNodeAdder) addMany(ctx context.Context, nds []format.Node, pin bool) error {
 89  	for _, nd := range nds {
 90  		// TODO: optimize
 91  		if err := api.add(ctx, nd, pin); err != nil {
 92  			return err
 93  		}
 94  	}
 95  	return nil
 96  }
 97  
 98  func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error {
 99  	return (*httpNodeAdder)(api).addMany(ctx, nds, false)
100  }
101  
102  func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error {
103  	return (*httpNodeAdder)(api).add(ctx, nd, false)
104  }
105  
106  func (api *pinningHttpNodeAdder) Add(ctx context.Context, nd format.Node) error {
107  	return (*httpNodeAdder)(api).add(ctx, nd, true)
108  }
109  
110  func (api *pinningHttpNodeAdder) AddMany(ctx context.Context, nds []format.Node) error {
111  	return (*httpNodeAdder)(api).addMany(ctx, nds, true)
112  }
113  
114  func (api *HttpDagServ) Pinning() format.NodeAdder {
115  	return (*pinningHttpNodeAdder)(api)
116  }
117  
118  func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error {
119  	return api.core().Block().Rm(ctx, path.FromCid(c)) // TODO: should we force rm?
120  }
121  
122  func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
123  	for _, c := range cids {
124  		// TODO: optimize
125  		if err := api.Remove(ctx, c); err != nil {
126  			return err
127  		}
128  	}
129  	return nil
130  }
131  
132  func (api *httpNodeAdder) core() *HttpApi {
133  	return (*HttpApi)(api)
134  }
135  
136  func (api *HttpDagServ) core() *HttpApi {
137  	return (*HttpApi)(api)
138  }