/ cleaner.go
cleaner.go
  1  package rendezvous
  2  
  3  import (
  4  	"container/heap"
  5  	"sync"
  6  	"time"
  7  )
  8  
  9  type deadline struct {
 10  	time time.Time
 11  }
 12  
 13  // definitely rename
 14  // Rewrite cleaner to operate on a leveldb directly
 15  // if it is impossible to query on topic+timestamp(big endian) for purging
 16  // store an additional key
 17  func NewCleaner() *Cleaner {
 18  	return &Cleaner{
 19  		heap:      []string{},
 20  		deadlines: map[string]deadline{},
 21  	}
 22  }
 23  
 24  type Cleaner struct {
 25  	mu        sync.RWMutex
 26  	heap      []string
 27  	deadlines map[string]deadline
 28  }
 29  
 30  func (c *Cleaner) Id(index int) string {
 31  	return c.heap[index]
 32  }
 33  
 34  func (c *Cleaner) Len() int {
 35  	return len(c.heap)
 36  }
 37  
 38  func (c *Cleaner) Less(i, j int) bool {
 39  	return c.deadlines[c.Id(i)].time.Before(c.deadlines[c.Id(j)].time)
 40  }
 41  
 42  func (c *Cleaner) Swap(i, j int) {
 43  	c.heap[i], c.heap[j] = c.heap[j], c.heap[i]
 44  }
 45  
 46  func (c *Cleaner) Push(record interface{}) {
 47  	c.heap = append(c.heap, record.(string))
 48  }
 49  
 50  func (c *Cleaner) Pop() interface{} {
 51  	old := c.heap
 52  	n := len(old)
 53  	x := old[n-1]
 54  	c.heap = append([]string{}, old[0:n-1]...)
 55  	_, exist := c.deadlines[x]
 56  	if !exist {
 57  		return x
 58  	}
 59  	delete(c.deadlines, x)
 60  	return x
 61  }
 62  
 63  func (c *Cleaner) Add(deadlineTime time.Time, key string) {
 64  	c.mu.Lock()
 65  	defer c.mu.Unlock()
 66  	dl, exist := c.deadlines[key]
 67  	if !exist {
 68  		dl = deadline{time: deadlineTime}
 69  	} else {
 70  		dl.time = deadlineTime
 71  		for i, n := range c.heap {
 72  			if n == key {
 73  				heap.Remove(c, i)
 74  				break
 75  			}
 76  		}
 77  	}
 78  	c.deadlines[key] = dl
 79  	heap.Push(c, key)
 80  }
 81  
 82  func (c *Cleaner) Exist(key string) bool {
 83  	c.mu.RLock()
 84  	defer c.mu.RUnlock()
 85  	_, exist := c.deadlines[key]
 86  	return exist
 87  }
 88  
 89  func (c *Cleaner) PopSince(now time.Time) (rst []string) {
 90  	c.mu.Lock()
 91  	defer c.mu.Unlock()
 92  	for len(c.heap) != 0 {
 93  		dl, exist := c.deadlines[c.heap[0]]
 94  		if !exist {
 95  			continue
 96  		}
 97  		if now.After(dl.time) {
 98  			rst = append(rst, heap.Pop(c).(string))
 99  		} else {
100  			return rst
101  		}
102  	}
103  	return rst
104  }