/ client / rpc / unixfs.go
unixfs.go
  1  package rpc
  2  
  3  import (
  4  	"context"
  5  	"encoding/json"
  6  	"errors"
  7  	"fmt"
  8  	"io"
  9  	"os"
 10  	"time"
 11  
 12  	"github.com/ipfs/boxo/files"
 13  	unixfs "github.com/ipfs/boxo/ipld/unixfs"
 14  	unixfs_pb "github.com/ipfs/boxo/ipld/unixfs/pb"
 15  	"github.com/ipfs/boxo/path"
 16  	"github.com/ipfs/go-cid"
 17  	iface "github.com/ipfs/kubo/core/coreiface"
 18  	caopts "github.com/ipfs/kubo/core/coreiface/options"
 19  	mh "github.com/multiformats/go-multihash"
 20  )
 21  
 22  type addEvent struct {
 23  	Name  string
 24  	Hash  string `json:",omitempty"`
 25  	Bytes int64  `json:",omitempty"`
 26  	Size  string `json:",omitempty"`
 27  }
 28  
 29  type UnixfsAPI HttpApi
 30  
 31  func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.UnixfsAddOption) (path.ImmutablePath, error) {
 32  	options, _, err := caopts.UnixfsAddOptions(opts...)
 33  	if err != nil {
 34  		return path.ImmutablePath{}, err
 35  	}
 36  
 37  	mht, ok := mh.Codes[options.MhType]
 38  	if !ok {
 39  		return path.ImmutablePath{}, fmt.Errorf("unknowm mhType %d", options.MhType)
 40  	}
 41  
 42  	req := api.core().Request("add").
 43  		Option("hash", mht).
 44  		Option("chunker", options.Chunker).
 45  		Option("cid-version", options.CidVersion).
 46  		Option("fscache", options.FsCache).
 47  		Option("inline", options.Inline).
 48  		Option("inline-limit", options.InlineLimit).
 49  		Option("nocopy", options.NoCopy).
 50  		Option("only-hash", options.OnlyHash).
 51  		Option("pin", options.Pin).
 52  		Option("silent", options.Silent).
 53  		Option("progress", options.Progress)
 54  
 55  	if options.RawLeavesSet {
 56  		req.Option("raw-leaves", options.RawLeaves)
 57  	}
 58  
 59  	switch options.Layout {
 60  	case caopts.BalancedLayout:
 61  		// noop, default
 62  	case caopts.TrickleLayout:
 63  		req.Option("trickle", true)
 64  	}
 65  
 66  	d := files.NewMapDirectory(map[string]files.Node{"": f}) // unwrapped on the other side
 67  
 68  	version, err := api.core().loadRemoteVersion()
 69  	if err != nil {
 70  		return path.ImmutablePath{}, err
 71  	}
 72  	useEncodedAbsPaths := version.LT(encodedAbsolutePathVersion)
 73  	req.Body(files.NewMultiFileReader(d, false, useEncodedAbsPaths))
 74  
 75  	var out addEvent
 76  	resp, err := req.Send(ctx)
 77  	if err != nil {
 78  		return path.ImmutablePath{}, err
 79  	}
 80  	if resp.Error != nil {
 81  		return path.ImmutablePath{}, resp.Error
 82  	}
 83  	defer resp.Output.Close()
 84  	dec := json.NewDecoder(resp.Output)
 85  
 86  	for {
 87  		var evt addEvent
 88  		if err := dec.Decode(&evt); err != nil {
 89  			if errors.Is(err, io.EOF) {
 90  				break
 91  			}
 92  			return path.ImmutablePath{}, err
 93  		}
 94  		out = evt
 95  
 96  		if options.Events != nil {
 97  			ifevt := &iface.AddEvent{
 98  				Name:  out.Name,
 99  				Size:  out.Size,
100  				Bytes: out.Bytes,
101  			}
102  
103  			if out.Hash != "" {
104  				c, err := cid.Parse(out.Hash)
105  				if err != nil {
106  					return path.ImmutablePath{}, err
107  				}
108  
109  				ifevt.Path = path.FromCid(c)
110  			}
111  
112  			select {
113  			case options.Events <- ifevt:
114  			case <-ctx.Done():
115  				return path.ImmutablePath{}, ctx.Err()
116  			}
117  		}
118  	}
119  
120  	c, err := cid.Parse(out.Hash)
121  	if err != nil {
122  		return path.ImmutablePath{}, err
123  	}
124  
125  	return path.FromCid(c), nil
126  }
127  
128  type lsLink struct {
129  	Name, Hash string
130  	Size       uint64
131  	Type       unixfs_pb.Data_DataType
132  	Target     string
133  
134  	Mode    os.FileMode
135  	ModTime time.Time
136  }
137  
138  type lsObject struct {
139  	Hash  string
140  	Links []lsLink
141  }
142  
143  type lsOutput struct {
144  	Objects []lsObject
145  }
146  
147  func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
148  	defer close(out)
149  
150  	options, err := caopts.UnixfsLsOptions(opts...)
151  	if err != nil {
152  		return err
153  	}
154  
155  	resp, err := api.core().Request("ls", p.String()).
156  		Option("resolve-type", options.ResolveChildren).
157  		Option("size", options.ResolveChildren).
158  		Option("stream", true).
159  		Send(ctx)
160  	if err != nil {
161  		return err
162  	}
163  	if resp.Error != nil {
164  		return err
165  	}
166  	defer resp.Close()
167  
168  	dec := json.NewDecoder(resp.Output)
169  
170  	for {
171  		var link lsOutput
172  		if err = dec.Decode(&link); err != nil {
173  			if err != io.EOF {
174  				return err
175  			}
176  			return nil
177  		}
178  
179  		if len(link.Objects) != 1 {
180  			return errors.New("unexpected Objects len")
181  		}
182  
183  		if len(link.Objects[0].Links) != 1 {
184  			return errors.New("unexpected Links len")
185  		}
186  
187  		l0 := link.Objects[0].Links[0]
188  
189  		c, err := cid.Decode(l0.Hash)
190  		if err != nil {
191  			return err
192  		}
193  
194  		var ftype iface.FileType
195  		switch l0.Type {
196  		case unixfs.TRaw, unixfs.TFile:
197  			ftype = iface.TFile
198  		case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
199  			ftype = iface.TDirectory
200  		case unixfs.TSymlink:
201  			ftype = iface.TSymlink
202  		}
203  
204  		select {
205  		case out <- iface.DirEntry{
206  			Name:   l0.Name,
207  			Cid:    c,
208  			Size:   l0.Size,
209  			Type:   ftype,
210  			Target: l0.Target,
211  
212  			Mode:    l0.Mode,
213  			ModTime: l0.ModTime,
214  		}:
215  		case <-ctx.Done():
216  			return ctx.Err()
217  		}
218  	}
219  }
220  
221  func (api *UnixfsAPI) core() *HttpApi {
222  	return (*HttpApi)(api)
223  }