/ core / coreapi / swarm.go
swarm.go
  1  package coreapi
  2  
  3  import (
  4  	"context"
  5  	"sort"
  6  	"time"
  7  
  8  	coreiface "github.com/ipfs/kubo/core/coreiface"
  9  	"github.com/ipfs/kubo/tracing"
 10  	inet "github.com/libp2p/go-libp2p/core/network"
 11  	"github.com/libp2p/go-libp2p/core/peer"
 12  	pstore "github.com/libp2p/go-libp2p/core/peerstore"
 13  	"github.com/libp2p/go-libp2p/core/protocol"
 14  	"github.com/libp2p/go-libp2p/p2p/net/swarm"
 15  	ma "github.com/multiformats/go-multiaddr"
 16  	"go.opentelemetry.io/otel/attribute"
 17  	"go.opentelemetry.io/otel/trace"
 18  )
 19  
 20  type SwarmAPI CoreAPI
 21  
 22  type connInfo struct {
 23  	peerstore pstore.Peerstore
 24  	conn      inet.Conn
 25  	dir       inet.Direction
 26  
 27  	addr ma.Multiaddr
 28  	peer peer.ID
 29  }
 30  
 31  // tag used in the connection manager when explicitly connecting to a peer.
 32  const (
 33  	connectionManagerTag    = "user-connect"
 34  	connectionManagerWeight = 100
 35  )
 36  
 37  func (api *SwarmAPI) Connect(ctx context.Context, pi peer.AddrInfo) error {
 38  	ctx, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Connect", trace.WithAttributes(attribute.String("peerid", pi.ID.String())))
 39  	defer span.End()
 40  
 41  	if api.peerHost == nil {
 42  		return coreiface.ErrOffline
 43  	}
 44  
 45  	if swrm, ok := api.peerHost.Network().(*swarm.Swarm); ok {
 46  		swrm.Backoff().Clear(pi.ID)
 47  	}
 48  
 49  	if err := api.peerHost.Connect(ctx, pi); err != nil {
 50  		return err
 51  	}
 52  
 53  	api.peerHost.ConnManager().TagPeer(pi.ID, connectionManagerTag, connectionManagerWeight)
 54  	return nil
 55  }
 56  
 57  func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error {
 58  	_, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Disconnect", trace.WithAttributes(attribute.String("addr", addr.String())))
 59  	defer span.End()
 60  
 61  	if api.peerHost == nil {
 62  		return coreiface.ErrOffline
 63  	}
 64  
 65  	taddr, id := peer.SplitAddr(addr)
 66  	if id == "" {
 67  		return peer.ErrInvalidAddr
 68  	}
 69  
 70  	span.SetAttributes(attribute.String("peerid", id.String()))
 71  
 72  	net := api.peerHost.Network()
 73  	if taddr == nil {
 74  		if net.Connectedness(id) != inet.Connected {
 75  			return coreiface.ErrNotConnected
 76  		}
 77  		if err := net.ClosePeer(id); err != nil {
 78  			return err
 79  		}
 80  		return nil
 81  	}
 82  	for _, conn := range net.ConnsToPeer(id) {
 83  		if !conn.RemoteMultiaddr().Equal(taddr) {
 84  			continue
 85  		}
 86  
 87  		return conn.Close()
 88  	}
 89  	return coreiface.ErrConnNotFound
 90  }
 91  
 92  func (api *SwarmAPI) KnownAddrs(ctx context.Context) (map[peer.ID][]ma.Multiaddr, error) {
 93  	_, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "KnownAddrs")
 94  	defer span.End()
 95  
 96  	if api.peerHost == nil {
 97  		return nil, coreiface.ErrOffline
 98  	}
 99  
100  	addrs := make(map[peer.ID][]ma.Multiaddr)
101  	ps := api.peerHost.Network().Peerstore()
102  	for _, p := range ps.Peers() {
103  		addrs[p] = append(addrs[p], ps.Addrs(p)...)
104  		sort.Slice(addrs[p], func(i, j int) bool {
105  			return addrs[p][i].String() < addrs[p][j].String()
106  		})
107  	}
108  
109  	return addrs, nil
110  }
111  
112  func (api *SwarmAPI) LocalAddrs(ctx context.Context) ([]ma.Multiaddr, error) {
113  	_, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "LocalAddrs")
114  	defer span.End()
115  
116  	if api.peerHost == nil {
117  		return nil, coreiface.ErrOffline
118  	}
119  
120  	return api.peerHost.Addrs(), nil
121  }
122  
123  func (api *SwarmAPI) ListenAddrs(ctx context.Context) ([]ma.Multiaddr, error) {
124  	_, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "ListenAddrs")
125  	defer span.End()
126  
127  	if api.peerHost == nil {
128  		return nil, coreiface.ErrOffline
129  	}
130  
131  	return api.peerHost.Network().InterfaceListenAddresses()
132  }
133  
134  func (api *SwarmAPI) Peers(ctx context.Context) ([]coreiface.ConnectionInfo, error) {
135  	_, span := tracing.Span(ctx, "CoreAPI.SwarmAPI", "Peers")
136  	defer span.End()
137  
138  	if api.peerHost == nil {
139  		return nil, coreiface.ErrOffline
140  	}
141  
142  	conns := api.peerHost.Network().Conns()
143  
144  	out := make([]coreiface.ConnectionInfo, 0, len(conns))
145  	for _, c := range conns {
146  
147  		ci := &connInfo{
148  			peerstore: api.peerstore,
149  			conn:      c,
150  			dir:       c.Stat().Direction,
151  
152  			addr: c.RemoteMultiaddr(),
153  			peer: c.RemotePeer(),
154  		}
155  
156  		/*
157  			// FIXME(steb):
158  			swcon, ok := c.(*swarm.Conn)
159  			if ok {
160  				ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn())
161  			}
162  		*/
163  
164  		out = append(out, ci)
165  	}
166  
167  	return out, nil
168  }
169  
170  func (ci *connInfo) ID() peer.ID {
171  	return ci.peer
172  }
173  
174  func (ci *connInfo) Address() ma.Multiaddr {
175  	return ci.addr
176  }
177  
178  func (ci *connInfo) Direction() inet.Direction {
179  	return ci.dir
180  }
181  
182  func (ci *connInfo) Latency() (time.Duration, error) {
183  	return ci.peerstore.LatencyEWMA(peer.ID(ci.ID())), nil
184  }
185  
186  func (ci *connInfo) Streams() ([]protocol.ID, error) {
187  	streams := ci.conn.GetStreams()
188  
189  	out := make([]protocol.ID, len(streams))
190  	for i, s := range streams {
191  		out[i] = s.Protocol()
192  	}
193  
194  	return out, nil
195  }