/ discovery.go
discovery.go
  1  package rendezvous
  2  
  3  import (
  4  	"context"
  5  	"math"
  6  	"math/rand"
  7  	"sync"
  8  	"time"
  9  
 10  	"github.com/libp2p/go-libp2p/core/discovery"
 11  	"github.com/libp2p/go-libp2p/core/host"
 12  	"github.com/libp2p/go-libp2p/core/peer"
 13  )
 14  
 15  type rendezvousDiscovery struct {
 16  	rp           RendezvousPoint
 17  	peerCache    map[string]*discoveryCache
 18  	peerCacheMux sync.RWMutex
 19  	rng          *rand.Rand
 20  	rngMux       sync.Mutex
 21  }
 22  
 23  type discoveryCache struct {
 24  	recs map[peer.ID]*peerRecord
 25  	mux  sync.Mutex
 26  }
 27  
 28  type peerRecord struct {
 29  	peer   peer.AddrInfo
 30  	expire int64
 31  }
 32  
 33  func NewRendezvousDiscovery(host host.Host) discovery.Discovery {
 34  	rp := NewRendezvousPoint(host)
 35  	return &rendezvousDiscovery{rp: rp, peerCache: make(map[string]*discoveryCache), rng: rand.New(rand.NewSource(rand.Int63()))}
 36  }
 37  
 38  func (c *rendezvousDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
 39  	// Get options
 40  	var options discovery.Options
 41  	err := options.Apply(opts...)
 42  	if err != nil {
 43  		return 0, err
 44  	}
 45  
 46  	ttl := options.Ttl
 47  	var ttlSeconds int
 48  
 49  	if ttl == 0 {
 50  		ttlSeconds = MaxTTL
 51  	} else {
 52  		ttlSeconds = int(math.Round(ttl.Seconds()))
 53  	}
 54  
 55  	if rttl, err := c.rp.Register(ctx, ns, ttlSeconds); err != nil {
 56  		return 0, err
 57  	} else {
 58  		return rttl, nil
 59  	}
 60  }
 61  
 62  func (c *rendezvousDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
 63  	// Get options
 64  	var options discovery.Options
 65  	err := options.Apply(opts...)
 66  	if err != nil {
 67  		return nil, err
 68  	}
 69  
 70  	const maxLimit = 50
 71  	limit := options.Limit
 72  	if limit == 0 || limit > maxLimit {
 73  		limit = maxLimit
 74  	}
 75  
 76  	// Get cached peers
 77  	var cache *discoveryCache
 78  
 79  	c.peerCacheMux.RLock()
 80  	cache, ok := c.peerCache[ns]
 81  	c.peerCacheMux.RUnlock()
 82  	if !ok {
 83  		c.peerCacheMux.Lock()
 84  		cache, ok = c.peerCache[ns]
 85  		if !ok {
 86  			cache = &discoveryCache{recs: make(map[peer.ID]*peerRecord)}
 87  			c.peerCache[ns] = cache
 88  		}
 89  		c.peerCacheMux.Unlock()
 90  	}
 91  
 92  	cache.mux.Lock()
 93  	defer cache.mux.Unlock()
 94  
 95  	// Remove all expired entries from cache
 96  	currentTime := time.Now().Unix()
 97  	newCacheSize := len(cache.recs)
 98  
 99  	for p := range cache.recs {
100  		rec := cache.recs[p]
101  		if rec.expire < currentTime {
102  			newCacheSize--
103  			delete(cache.recs, p)
104  		}
105  	}
106  
107  	// Discover new records if we don't have enough
108  	if newCacheSize < limit {
109  		// TODO: Should we return error even if we have valid cached results?
110  		var regs []Registration
111  		if regs, err = c.rp.Discover(ctx, ns, limit); err == nil {
112  			for _, reg := range regs {
113  				rec := &peerRecord{peer: reg.Peer, expire: int64(reg.Ttl) + currentTime}
114  				cache.recs[rec.peer.ID] = rec
115  			}
116  		}
117  	}
118  
119  	// Randomize and fill channel with available records
120  	count := len(cache.recs)
121  	if limit < count {
122  		count = limit
123  	}
124  
125  	chPeer := make(chan peer.AddrInfo, count)
126  
127  	c.rngMux.Lock()
128  	perm := c.rng.Perm(len(cache.recs))[0:count]
129  	c.rngMux.Unlock()
130  
131  	permSet := make(map[int]int)
132  	for i, v := range perm {
133  		permSet[v] = i
134  	}
135  
136  	sendLst := make([]*peer.AddrInfo, count)
137  	iter := 0
138  	for k := range cache.recs {
139  		if sendIndex, ok := permSet[iter]; ok {
140  			sendLst[sendIndex] = &cache.recs[k].peer
141  		}
142  		iter++
143  	}
144  
145  	for _, send := range sendLst {
146  		chPeer <- *send
147  	}
148  
149  	close(chPeer)
150  	return chPeer, err
151  }