dag.go
1 package rpc 2 3 import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 9 "github.com/ipfs/boxo/path" 10 blocks "github.com/ipfs/go-block-format" 11 "github.com/ipfs/go-cid" 12 format "github.com/ipfs/go-ipld-format" 13 "github.com/ipfs/kubo/core/coreiface/options" 14 multicodec "github.com/multiformats/go-multicodec" 15 ) 16 17 type ( 18 httpNodeAdder HttpApi 19 HttpDagServ httpNodeAdder 20 pinningHttpNodeAdder httpNodeAdder 21 ) 22 23 func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) { 24 r, err := api.core().Block().Get(ctx, path.FromCid(c)) 25 if err != nil { 26 return nil, err 27 } 28 29 data, err := io.ReadAll(r) 30 if err != nil { 31 return nil, err 32 } 33 34 blk, err := blocks.NewBlockWithCid(data, c) 35 if err != nil { 36 return nil, err 37 } 38 39 return api.ipldDecoder.DecodeNode(ctx, blk) 40 } 41 42 func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption { 43 out := make(chan *format.NodeOption) 44 45 for _, c := range cids { 46 // TODO: Consider limiting concurrency of this somehow 47 go func(c cid.Cid) { 48 n, err := api.Get(ctx, c) 49 50 select { 51 case out <- &format.NodeOption{Node: n, Err: err}: 52 case <-ctx.Done(): 53 } 54 }(c) 55 } 56 return out 57 } 58 59 func (api *httpNodeAdder) add(ctx context.Context, nd format.Node, pin bool) error { 60 c := nd.Cid() 61 prefix := c.Prefix() 62 63 // preserve 'cid-codec' when sent over HTTP 64 cidCodec := multicodec.Code(prefix.Codec).String() 65 66 // 'format' got replaced by 'cid-codec' in https://github.com/ipfs/interface-go-ipfs-core/pull/80 67 // but we still support it here for backward-compatibility with use of CIDv0 68 format := "" 69 if prefix.Version == 0 { 70 cidCodec = "" 71 format = "v0" 72 } 73 74 stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()), 75 options.Block.Hash(prefix.MhType, prefix.MhLength), 76 options.Block.CidCodec(cidCodec), 77 options.Block.Format(format), 78 options.Block.Pin(pin)) 79 if err != nil { 80 return err 81 } 82 if !stat.Path().RootCid().Equals(c) { 83 return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().RootCid().String()) 84 } 85 return nil 86 } 87 88 func (api *httpNodeAdder) addMany(ctx context.Context, nds []format.Node, pin bool) error { 89 for _, nd := range nds { 90 // TODO: optimize 91 if err := api.add(ctx, nd, pin); err != nil { 92 return err 93 } 94 } 95 return nil 96 } 97 98 func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error { 99 return (*httpNodeAdder)(api).addMany(ctx, nds, false) 100 } 101 102 func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error { 103 return (*httpNodeAdder)(api).add(ctx, nd, false) 104 } 105 106 func (api *pinningHttpNodeAdder) Add(ctx context.Context, nd format.Node) error { 107 return (*httpNodeAdder)(api).add(ctx, nd, true) 108 } 109 110 func (api *pinningHttpNodeAdder) AddMany(ctx context.Context, nds []format.Node) error { 111 return (*httpNodeAdder)(api).addMany(ctx, nds, true) 112 } 113 114 func (api *HttpDagServ) Pinning() format.NodeAdder { 115 return (*pinningHttpNodeAdder)(api) 116 } 117 118 func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error { 119 return api.core().Block().Rm(ctx, path.FromCid(c)) // TODO: should we force rm? 120 } 121 122 func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { 123 for _, c := range cids { 124 // TODO: optimize 125 if err := api.Remove(ctx, c); err != nil { 126 return err 127 } 128 } 129 return nil 130 } 131 132 func (api *httpNodeAdder) core() *HttpApi { 133 return (*HttpApi)(api) 134 } 135 136 func (api *HttpDagServ) core() *HttpApi { 137 return (*HttpApi)(api) 138 }