/ cleaner.go
cleaner.go
1 package rendezvous 2 3 import ( 4 "container/heap" 5 "sync" 6 "time" 7 ) 8 9 type deadline struct { 10 time time.Time 11 } 12 13 // definitely rename 14 // Rewrite cleaner to operate on a leveldb directly 15 // if it is impossible to query on topic+timestamp(big endian) for purging 16 // store an additional key 17 func NewCleaner() *Cleaner { 18 return &Cleaner{ 19 heap: []string{}, 20 deadlines: map[string]deadline{}, 21 } 22 } 23 24 type Cleaner struct { 25 mu sync.RWMutex 26 heap []string 27 deadlines map[string]deadline 28 } 29 30 func (c *Cleaner) Id(index int) string { 31 return c.heap[index] 32 } 33 34 func (c *Cleaner) Len() int { 35 return len(c.heap) 36 } 37 38 func (c *Cleaner) Less(i, j int) bool { 39 return c.deadlines[c.Id(i)].time.Before(c.deadlines[c.Id(j)].time) 40 } 41 42 func (c *Cleaner) Swap(i, j int) { 43 c.heap[i], c.heap[j] = c.heap[j], c.heap[i] 44 } 45 46 func (c *Cleaner) Push(record interface{}) { 47 c.heap = append(c.heap, record.(string)) 48 } 49 50 func (c *Cleaner) Pop() interface{} { 51 old := c.heap 52 n := len(old) 53 x := old[n-1] 54 c.heap = append([]string{}, old[0:n-1]...) 55 _, exist := c.deadlines[x] 56 if !exist { 57 return x 58 } 59 delete(c.deadlines, x) 60 return x 61 } 62 63 func (c *Cleaner) Add(deadlineTime time.Time, key string) { 64 c.mu.Lock() 65 defer c.mu.Unlock() 66 dl, exist := c.deadlines[key] 67 if !exist { 68 dl = deadline{time: deadlineTime} 69 } else { 70 dl.time = deadlineTime 71 for i, n := range c.heap { 72 if n == key { 73 heap.Remove(c, i) 74 break 75 } 76 } 77 } 78 c.deadlines[key] = dl 79 heap.Push(c, key) 80 } 81 82 func (c *Cleaner) Exist(key string) bool { 83 c.mu.RLock() 84 defer c.mu.RUnlock() 85 _, exist := c.deadlines[key] 86 return exist 87 } 88 89 func (c *Cleaner) PopSince(now time.Time) (rst []string) { 90 c.mu.Lock() 91 defer c.mu.Unlock() 92 for len(c.heap) != 0 { 93 dl, exist := c.deadlines[c.heap[0]] 94 if !exist { 95 continue 96 } 97 if now.After(dl.time) { 98 rst = append(rst, heap.Pop(c).(string)) 99 } else { 100 return rst 101 } 102 } 103 return rst 104 }