swarm.go
1 package coreapi 2 3 import ( 4 "context" 5 "sort" 6 "time" 7 8 coreiface "github.com/ipfs/kubo/core/coreiface" 9 "github.com/ipfs/kubo/tracing" 10 inet "github.com/libp2p/go-libp2p/core/network" 11 "github.com/libp2p/go-libp2p/core/peer" 12 pstore "github.com/libp2p/go-libp2p/core/peerstore" 13 "github.com/libp2p/go-libp2p/core/protocol" 14 "github.com/libp2p/go-libp2p/p2p/net/swarm" 15 ma "github.com/multiformats/go-multiaddr" 16 "go.opentelemetry.io/otel/attribute" 17 "go.opentelemetry.io/otel/trace" 18 ) 19 20 type SwarmAPI CoreAPI 21 22 type connInfo struct { 23 peerstore pstore.Peerstore 24 conn inet.Conn 25 dir inet.Direction 26 27 addr ma.Multiaddr 28 peer peer.ID 29 } 30 31 // tag used in the connection manager when explicitly connecting to a peer. 32 const ( 33 connectionManagerTag = "user-connect" 34 connectionManagerWeight = 100 35 ) 36 37 func (api *SwarmAPI) Connect(ctx context.Context, pi peer.AddrInfo) error { 38 ctx, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Connect", trace.WithAttributes(attribute.String("peerid", pi.ID.String()))) 39 defer span.End() 40 41 if api.peerHost == nil { 42 return coreiface.ErrOffline 43 } 44 45 if swrm, ok := api.peerHost.Network().(*swarm.Swarm); ok { 46 swrm.Backoff().Clear(pi.ID) 47 } 48 49 if err := api.peerHost.Connect(ctx, pi); err != nil { 50 return err 51 } 52 53 api.peerHost.ConnManager().TagPeer(pi.ID, connectionManagerTag, connectionManagerWeight) 54 return nil 55 } 56 57 func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { 58 _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Disconnect", trace.WithAttributes(attribute.String("addr", addr.String()))) 59 defer span.End() 60 61 if api.peerHost == nil { 62 return coreiface.ErrOffline 63 } 64 65 taddr, id := peer.SplitAddr(addr) 66 if id == "" { 67 return peer.ErrInvalidAddr 68 } 69 70 span.SetAttributes(attribute.String("peerid", id.String())) 71 72 net := api.peerHost.Network() 73 if taddr == nil { 74 if net.Connectedness(id) != inet.Connected { 75 return coreiface.ErrNotConnected 76 } 77 if err := net.ClosePeer(id); err != nil { 78 return err 79 } 80 return nil 81 } 82 for _, conn := range net.ConnsToPeer(id) { 83 if !conn.RemoteMultiaddr().Equal(taddr) { 84 continue 85 } 86 87 return conn.Close() 88 } 89 return coreiface.ErrConnNotFound 90 } 91 92 func (api *SwarmAPI) KnownAddrs(ctx context.Context) (map[peer.ID][]ma.Multiaddr, error) { 93 _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "KnownAddrs") 94 defer span.End() 95 96 if api.peerHost == nil { 97 return nil, coreiface.ErrOffline 98 } 99 100 addrs := make(map[peer.ID][]ma.Multiaddr) 101 ps := api.peerHost.Network().Peerstore() 102 for _, p := range ps.Peers() { 103 addrs[p] = append(addrs[p], ps.Addrs(p)...) 104 sort.Slice(addrs[p], func(i, j int) bool { 105 return addrs[p][i].String() < addrs[p][j].String() 106 }) 107 } 108 109 return addrs, nil 110 } 111 112 func (api *SwarmAPI) LocalAddrs(ctx context.Context) ([]ma.Multiaddr, error) { 113 _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "LocalAddrs") 114 defer span.End() 115 116 if api.peerHost == nil { 117 return nil, coreiface.ErrOffline 118 } 119 120 return api.peerHost.Addrs(), nil 121 } 122 123 func (api *SwarmAPI) ListenAddrs(ctx context.Context) ([]ma.Multiaddr, error) { 124 _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "ListenAddrs") 125 defer span.End() 126 127 if api.peerHost == nil { 128 return nil, coreiface.ErrOffline 129 } 130 131 return api.peerHost.Network().InterfaceListenAddresses() 132 } 133 134 func (api *SwarmAPI) Peers(ctx context.Context) ([]coreiface.ConnectionInfo, error) { 135 _, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Peers") 136 defer span.End() 137 138 if api.peerHost == nil { 139 return nil, coreiface.ErrOffline 140 } 141 142 conns := api.peerHost.Network().Conns() 143 144 out := make([]coreiface.ConnectionInfo, 0, len(conns)) 145 for _, c := range conns { 146 147 ci := &connInfo{ 148 peerstore: api.peerstore, 149 conn: c, 150 dir: c.Stat().Direction, 151 152 addr: c.RemoteMultiaddr(), 153 peer: c.RemotePeer(), 154 } 155 156 /* 157 // FIXME(steb): 158 swcon, ok := c.(*swarm.Conn) 159 if ok { 160 ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) 161 } 162 */ 163 164 out = append(out, ci) 165 } 166 167 return out, nil 168 } 169 170 func (ci *connInfo) ID() peer.ID { 171 return ci.peer 172 } 173 174 func (ci *connInfo) Address() ma.Multiaddr { 175 return ci.addr 176 } 177 178 func (ci *connInfo) Direction() inet.Direction { 179 return ci.dir 180 } 181 182 func (ci *connInfo) Latency() (time.Duration, error) { 183 return ci.peerstore.LatencyEWMA(peer.ID(ci.ID())), nil 184 } 185 186 func (ci *connInfo) Streams() ([]protocol.ID, error) { 187 streams := ci.conn.GetStreams() 188 189 out := make([]protocol.ID, len(streams)) 190 for i, s := range streams { 191 out[i] = s.Protocol() 192 } 193 194 return out, nil 195 }