/ discovery.go
discovery.go
1 package rendezvous 2 3 import ( 4 "context" 5 "math" 6 "math/rand" 7 "sync" 8 "time" 9 10 "github.com/libp2p/go-libp2p/core/discovery" 11 "github.com/libp2p/go-libp2p/core/host" 12 "github.com/libp2p/go-libp2p/core/peer" 13 ) 14 15 type rendezvousDiscovery struct { 16 rp RendezvousPoint 17 peerCache map[string]*discoveryCache 18 peerCacheMux sync.RWMutex 19 rng *rand.Rand 20 rngMux sync.Mutex 21 } 22 23 type discoveryCache struct { 24 recs map[peer.ID]*peerRecord 25 mux sync.Mutex 26 } 27 28 type peerRecord struct { 29 peer peer.AddrInfo 30 expire int64 31 } 32 33 func NewRendezvousDiscovery(host host.Host) discovery.Discovery { 34 rp := NewRendezvousPoint(host) 35 return &rendezvousDiscovery{rp: rp, peerCache: make(map[string]*discoveryCache), rng: rand.New(rand.NewSource(rand.Int63()))} 36 } 37 38 func (c *rendezvousDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { 39 // Get options 40 var options discovery.Options 41 err := options.Apply(opts...) 42 if err != nil { 43 return 0, err 44 } 45 46 ttl := options.Ttl 47 var ttlSeconds int 48 49 if ttl == 0 { 50 ttlSeconds = MaxTTL 51 } else { 52 ttlSeconds = int(math.Round(ttl.Seconds())) 53 } 54 55 if rttl, err := c.rp.Register(ctx, ns, ttlSeconds); err != nil { 56 return 0, err 57 } else { 58 return rttl, nil 59 } 60 } 61 62 func (c *rendezvousDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { 63 // Get options 64 var options discovery.Options 65 err := options.Apply(opts...) 66 if err != nil { 67 return nil, err 68 } 69 70 const maxLimit = 50 71 limit := options.Limit 72 if limit == 0 || limit > maxLimit { 73 limit = maxLimit 74 } 75 76 // Get cached peers 77 var cache *discoveryCache 78 79 c.peerCacheMux.RLock() 80 cache, ok := c.peerCache[ns] 81 c.peerCacheMux.RUnlock() 82 if !ok { 83 c.peerCacheMux.Lock() 84 cache, ok = c.peerCache[ns] 85 if !ok { 86 cache = &discoveryCache{recs: make(map[peer.ID]*peerRecord)} 87 c.peerCache[ns] = cache 88 } 89 c.peerCacheMux.Unlock() 90 } 91 92 cache.mux.Lock() 93 defer cache.mux.Unlock() 94 95 // Remove all expired entries from cache 96 currentTime := time.Now().Unix() 97 newCacheSize := len(cache.recs) 98 99 for p := range cache.recs { 100 rec := cache.recs[p] 101 if rec.expire < currentTime { 102 newCacheSize-- 103 delete(cache.recs, p) 104 } 105 } 106 107 // Discover new records if we don't have enough 108 if newCacheSize < limit { 109 // TODO: Should we return error even if we have valid cached results? 110 var regs []Registration 111 if regs, err = c.rp.Discover(ctx, ns, limit); err == nil { 112 for _, reg := range regs { 113 rec := &peerRecord{peer: reg.Peer, expire: int64(reg.Ttl) + currentTime} 114 cache.recs[rec.peer.ID] = rec 115 } 116 } 117 } 118 119 // Randomize and fill channel with available records 120 count := len(cache.recs) 121 if limit < count { 122 count = limit 123 } 124 125 chPeer := make(chan peer.AddrInfo, count) 126 127 c.rngMux.Lock() 128 perm := c.rng.Perm(len(cache.recs))[0:count] 129 c.rngMux.Unlock() 130 131 permSet := make(map[int]int) 132 for i, v := range perm { 133 permSet[v] = i 134 } 135 136 sendLst := make([]*peer.AddrInfo, count) 137 iter := 0 138 for k := range cache.recs { 139 if sendIndex, ok := permSet[iter]; ok { 140 sendLst[sendIndex] = &cache.recs[k].peer 141 } 142 iter++ 143 } 144 145 for _, send := range sendLst { 146 chPeer <- *send 147 } 148 149 close(chPeer) 150 return chPeer, err 151 }