unixfs.go
1 package rpc 2 3 import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "os" 10 "time" 11 12 "github.com/ipfs/boxo/files" 13 unixfs "github.com/ipfs/boxo/ipld/unixfs" 14 unixfs_pb "github.com/ipfs/boxo/ipld/unixfs/pb" 15 "github.com/ipfs/boxo/path" 16 "github.com/ipfs/go-cid" 17 iface "github.com/ipfs/kubo/core/coreiface" 18 caopts "github.com/ipfs/kubo/core/coreiface/options" 19 mh "github.com/multiformats/go-multihash" 20 ) 21 22 type addEvent struct { 23 Name string 24 Hash string `json:",omitempty"` 25 Bytes int64 `json:",omitempty"` 26 Size string `json:",omitempty"` 27 } 28 29 type UnixfsAPI HttpApi 30 31 func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.UnixfsAddOption) (path.ImmutablePath, error) { 32 options, _, err := caopts.UnixfsAddOptions(opts...) 33 if err != nil { 34 return path.ImmutablePath{}, err 35 } 36 37 mht, ok := mh.Codes[options.MhType] 38 if !ok { 39 return path.ImmutablePath{}, fmt.Errorf("unknowm mhType %d", options.MhType) 40 } 41 42 req := api.core().Request("add"). 43 Option("hash", mht). 44 Option("chunker", options.Chunker). 45 Option("cid-version", options.CidVersion). 46 Option("fscache", options.FsCache). 47 Option("inline", options.Inline). 48 Option("inline-limit", options.InlineLimit). 49 Option("nocopy", options.NoCopy). 50 Option("only-hash", options.OnlyHash). 51 Option("pin", options.Pin). 52 Option("silent", options.Silent). 53 Option("progress", options.Progress) 54 55 if options.RawLeavesSet { 56 req.Option("raw-leaves", options.RawLeaves) 57 } 58 59 switch options.Layout { 60 case caopts.BalancedLayout: 61 // noop, default 62 case caopts.TrickleLayout: 63 req.Option("trickle", true) 64 } 65 66 d := files.NewMapDirectory(map[string]files.Node{"": f}) // unwrapped on the other side 67 68 version, err := api.core().loadRemoteVersion() 69 if err != nil { 70 return path.ImmutablePath{}, err 71 } 72 useEncodedAbsPaths := version.LT(encodedAbsolutePathVersion) 73 req.Body(files.NewMultiFileReader(d, false, useEncodedAbsPaths)) 74 75 var out addEvent 76 resp, err := req.Send(ctx) 77 if err != nil { 78 return path.ImmutablePath{}, err 79 } 80 if resp.Error != nil { 81 return path.ImmutablePath{}, resp.Error 82 } 83 defer resp.Output.Close() 84 dec := json.NewDecoder(resp.Output) 85 86 for { 87 var evt addEvent 88 if err := dec.Decode(&evt); err != nil { 89 if errors.Is(err, io.EOF) { 90 break 91 } 92 return path.ImmutablePath{}, err 93 } 94 out = evt 95 96 if options.Events != nil { 97 ifevt := &iface.AddEvent{ 98 Name: out.Name, 99 Size: out.Size, 100 Bytes: out.Bytes, 101 } 102 103 if out.Hash != "" { 104 c, err := cid.Parse(out.Hash) 105 if err != nil { 106 return path.ImmutablePath{}, err 107 } 108 109 ifevt.Path = path.FromCid(c) 110 } 111 112 select { 113 case options.Events <- ifevt: 114 case <-ctx.Done(): 115 return path.ImmutablePath{}, ctx.Err() 116 } 117 } 118 } 119 120 c, err := cid.Parse(out.Hash) 121 if err != nil { 122 return path.ImmutablePath{}, err 123 } 124 125 return path.FromCid(c), nil 126 } 127 128 type lsLink struct { 129 Name, Hash string 130 Size uint64 131 Type unixfs_pb.Data_DataType 132 Target string 133 134 Mode os.FileMode 135 ModTime time.Time 136 } 137 138 type lsObject struct { 139 Hash string 140 Links []lsLink 141 } 142 143 type lsOutput struct { 144 Objects []lsObject 145 } 146 147 func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error { 148 defer close(out) 149 150 options, err := caopts.UnixfsLsOptions(opts...) 151 if err != nil { 152 return err 153 } 154 155 resp, err := api.core().Request("ls", p.String()). 156 Option("resolve-type", options.ResolveChildren). 157 Option("size", options.ResolveChildren). 158 Option("stream", true). 159 Send(ctx) 160 if err != nil { 161 return err 162 } 163 if resp.Error != nil { 164 return err 165 } 166 defer resp.Close() 167 168 dec := json.NewDecoder(resp.Output) 169 170 for { 171 var link lsOutput 172 if err = dec.Decode(&link); err != nil { 173 if err != io.EOF { 174 return err 175 } 176 return nil 177 } 178 179 if len(link.Objects) != 1 { 180 return errors.New("unexpected Objects len") 181 } 182 183 if len(link.Objects[0].Links) != 1 { 184 return errors.New("unexpected Links len") 185 } 186 187 l0 := link.Objects[0].Links[0] 188 189 c, err := cid.Decode(l0.Hash) 190 if err != nil { 191 return err 192 } 193 194 var ftype iface.FileType 195 switch l0.Type { 196 case unixfs.TRaw, unixfs.TFile: 197 ftype = iface.TFile 198 case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata: 199 ftype = iface.TDirectory 200 case unixfs.TSymlink: 201 ftype = iface.TSymlink 202 } 203 204 select { 205 case out <- iface.DirEntry{ 206 Name: l0.Name, 207 Cid: c, 208 Size: l0.Size, 209 Type: ftype, 210 Target: l0.Target, 211 212 Mode: l0.Mode, 213 ModTime: l0.ModTime, 214 }: 215 case <-ctx.Done(): 216 return ctx.Err() 217 } 218 } 219 } 220 221 func (api *UnixfsAPI) core() *HttpApi { 222 return (*HttpApi)(api) 223 }