monitor.go
1 package system 2 3 import ( 4 "context" 5 _ "embed" 6 "fmt" 7 "time" 8 9 dbus "github.com/coreos/go-systemd/v22/dbus" 10 dogeboxd "github.com/dogeorg/dogeboxd/pkg" 11 "github.com/shirou/gopsutil/v4/process" 12 ) 13 14 const ( 15 MONITOR_INTERVAL time.Duration = 10 * time.Second 16 ) 17 18 func NewSystemMonitor(config dogeboxd.ServerConfig) SystemMonitor { 19 services := []string{} 20 return SystemMonitor{ 21 config: config, 22 services: services, 23 mon: make(chan []string, 10), 24 stats: make(chan map[string]dogeboxd.ProcStatus), 25 fastMon: make(chan string, 10), 26 fastStats: make(chan map[string]dogeboxd.ProcStatus), 27 } 28 } 29 30 /* SystemMonitor 31 * 32 * SystemMonitor accepts arrays of strings contianing 33 * Systemd service names, ie: 'dogecoind.service' via 34 * it's 'mon' channel. These are then observed every N 35 * seconds and the monitor issues []dogeboxd.ProcStatus 36 * results on the 'stats' channel. 37 * 38 * send a service name beginning with '-' to remove a 39 * service from the monitoring list. 40 * 41 * Sending a service (again) will cause the SystemMonitor 42 * to respond immediately with ProcStatus for those services. 43 */ 44 45 type SystemMonitor struct { 46 config dogeboxd.ServerConfig 47 services []string 48 mon chan []string 49 stats chan map[string]dogeboxd.ProcStatus 50 fastMon chan string 51 fastStats chan map[string]dogeboxd.ProcStatus 52 } 53 54 func (t SystemMonitor) Run(started, stopped chan bool, stop chan context.Context) error { 55 go func() { 56 go func() { 57 timer := time.NewTimer(MONITOR_INTERVAL) 58 defer timer.Stop() 59 ctx, stopLoopers := context.WithCancel(context.Background()) 60 mainloop: 61 for { 62 select { 63 case <-stop: 64 stopLoopers() // kill any fast loopers running 65 break mainloop 66 case s := <-t.mon: 67 t.updateServices(s) 68 stats, err := t.runChecks(t.services) 69 if err != nil { 70 continue mainloop 71 } 72 select { 73 case t.stats <- stats: 74 default: 75 fmt.Println("couldn't write to output channel") 76 } 77 78 case s := <-t.fastMon: 79 stop, _ := context.WithCancel(ctx) 80 t.fastLooper(s, stop) // quickly iterate run check for a pup starting/stopping 81 case <-timer.C: 82 stats, err := t.runChecks(t.services) 83 if err != nil { 84 continue mainloop 85 } 86 select { 87 case t.stats <- stats: 88 default: 89 fmt.Println("couldn't write to output channel") 90 } 91 92 timer.Reset(MONITOR_INTERVAL) 93 } 94 } 95 }() 96 97 started <- true 98 <-stop 99 // do shutdown things 100 stopped <- true 101 }() 102 return nil 103 } 104 105 /* This function quickly polls for the status of a pup 106 * that we expect is shutting down or starting up. 107 */ 108 func (t *SystemMonitor) fastLooper(service string, stop context.Context) { 109 // fast looper, one specific pup 110 go func() { 111 intervals := []time.Duration{ 112 1 * time.Second, 113 1 * time.Second, 114 1 * time.Second, 115 3 * time.Second, 116 5 * time.Second, 117 } 118 119 place := 0 120 timer := time.NewTimer(intervals[place]) 121 defer timer.Stop() 122 123 for { 124 select { 125 case <-stop.Done(): 126 break 127 case <-timer.C: 128 stats, err := t.runChecks([]string{service}) 129 if err != nil { 130 continue 131 } 132 133 select { 134 case t.fastStats <- stats: 135 default: 136 fmt.Println("couldn't write to output channel") 137 } 138 139 place++ 140 if place >= len(intervals) { 141 break 142 } 143 timer.Reset(intervals[place]) 144 } 145 } 146 }() 147 } 148 149 func (t *SystemMonitor) runChecks(services []string) (map[string]dogeboxd.ProcStatus, error) { 150 stats, err := getStatus(services) 151 if err != nil { 152 fmt.Println("error getting stats from systemd:", err) 153 return stats, err 154 } 155 return stats, err 156 } 157 158 func (t *SystemMonitor) updateServices(args []string) { 159 t.services = args 160 } 161 162 func getStatus(serviceNames []string) (map[string]dogeboxd.ProcStatus, error) { 163 conn, err := dbus.NewWithContext(context.Background()) 164 if err != nil { 165 panic(err) 166 } 167 defer conn.Close() 168 169 out := map[string]dogeboxd.ProcStatus{} 170 for _, service := range serviceNames { 171 pidProp, err := conn.GetServicePropertyContext(context.Background(), service, "MainPID") 172 if err != nil { 173 continue 174 } 175 pid := pidProp.Value.Value().(uint32) 176 cpu := float64(0) 177 mem := float64(0) 178 rssM := float64(0) 179 running := false 180 181 proc, err := process.NewProcess(int32(pid)) 182 if err == nil { 183 running = true 184 185 c, err := proc.CPUPercent() 186 if err == nil { 187 cpu = c 188 } 189 190 m, err := proc.MemoryPercent() 191 if err == nil { 192 mem = float64(m) 193 } 194 195 memInfo, err := proc.MemoryInfo() 196 if err == nil { 197 rssM = float64(memInfo.RSS) / float64(1048576) 198 } 199 200 } 201 202 out[service] = dogeboxd.ProcStatus{ 203 CPUPercent: cpu, 204 MEMPercent: mem, 205 MEMMb: rssM, 206 Running: running, 207 } 208 } 209 210 return out, err 211 } 212 213 func (t SystemMonitor) GetMonChannel() chan []string { 214 return t.mon 215 } 216 217 func (t SystemMonitor) GetStatChannel() chan map[string]dogeboxd.ProcStatus { 218 return t.stats 219 } 220 221 func (t SystemMonitor) GetFastMonChannel() chan string { 222 return t.fastMon 223 } 224 225 func (t SystemMonitor) GetFastStatChannel() chan map[string]dogeboxd.ProcStatus { 226 return t.fastStats 227 }