/ pkg / system / monitor.go
monitor.go
  1  package system
  2  
  3  import (
  4  	"context"
  5  	_ "embed"
  6  	"fmt"
  7  	"time"
  8  
  9  	dbus "github.com/coreos/go-systemd/v22/dbus"
 10  	dogeboxd "github.com/dogeorg/dogeboxd/pkg"
 11  	"github.com/shirou/gopsutil/v4/process"
 12  )
 13  
 14  const (
 15  	MONITOR_INTERVAL time.Duration = 10 * time.Second
 16  )
 17  
 18  func NewSystemMonitor(config dogeboxd.ServerConfig) SystemMonitor {
 19  	services := []string{}
 20  	return SystemMonitor{
 21  		config:    config,
 22  		services:  services,
 23  		mon:       make(chan []string, 10),
 24  		stats:     make(chan map[string]dogeboxd.ProcStatus),
 25  		fastMon:   make(chan string, 10),
 26  		fastStats: make(chan map[string]dogeboxd.ProcStatus),
 27  	}
 28  }
 29  
 30  /* SystemMonitor
 31   *
 32   * SystemMonitor accepts arrays of strings contianing
 33   * Systemd service names, ie: 'dogecoind.service' via
 34   * it's 'mon' channel. These are then observed every N
 35   * seconds and the monitor issues []dogeboxd.ProcStatus
 36   * results on the 'stats' channel.
 37   *
 38   * send a service name beginning with '-' to remove a
 39   * service from the monitoring list.
 40   *
 41   * Sending a service (again) will cause the SystemMonitor
 42   * to respond immediately with ProcStatus for those services.
 43   */
 44  
 45  type SystemMonitor struct {
 46  	config    dogeboxd.ServerConfig
 47  	services  []string
 48  	mon       chan []string
 49  	stats     chan map[string]dogeboxd.ProcStatus
 50  	fastMon   chan string
 51  	fastStats chan map[string]dogeboxd.ProcStatus
 52  }
 53  
 54  func (t SystemMonitor) Run(started, stopped chan bool, stop chan context.Context) error {
 55  	go func() {
 56  		go func() {
 57  			timer := time.NewTimer(MONITOR_INTERVAL)
 58  			defer timer.Stop()
 59  			ctx, stopLoopers := context.WithCancel(context.Background())
 60  		mainloop:
 61  			for {
 62  				select {
 63  				case <-stop:
 64  					stopLoopers() // kill any fast loopers running
 65  					break mainloop
 66  				case s := <-t.mon:
 67  					t.updateServices(s)
 68  					stats, err := t.runChecks(t.services)
 69  					if err != nil {
 70  						continue mainloop
 71  					}
 72  					select {
 73  					case t.stats <- stats:
 74  					default:
 75  						fmt.Println("couldn't write to output channel")
 76  					}
 77  
 78  				case s := <-t.fastMon:
 79  					stop, _ := context.WithCancel(ctx)
 80  					t.fastLooper(s, stop) // quickly iterate run check for a pup starting/stopping
 81  				case <-timer.C:
 82  					stats, err := t.runChecks(t.services)
 83  					if err != nil {
 84  						continue mainloop
 85  					}
 86  					select {
 87  					case t.stats <- stats:
 88  					default:
 89  						fmt.Println("couldn't write to output channel")
 90  					}
 91  
 92  					timer.Reset(MONITOR_INTERVAL)
 93  				}
 94  			}
 95  		}()
 96  
 97  		started <- true
 98  		<-stop
 99  		// do shutdown things
100  		stopped <- true
101  	}()
102  	return nil
103  }
104  
105  /* This function quickly polls for the status of a pup
106  * that we expect is shutting down or starting up.
107   */
108  func (t *SystemMonitor) fastLooper(service string, stop context.Context) {
109  	// fast looper, one specific pup
110  	go func() {
111  		intervals := []time.Duration{
112  			1 * time.Second,
113  			1 * time.Second,
114  			1 * time.Second,
115  			3 * time.Second,
116  			5 * time.Second,
117  		}
118  
119  		place := 0
120  		timer := time.NewTimer(intervals[place])
121  		defer timer.Stop()
122  
123  		for {
124  			select {
125  			case <-stop.Done():
126  				break
127  			case <-timer.C:
128  				stats, err := t.runChecks([]string{service})
129  				if err != nil {
130  					continue
131  				}
132  
133  				select {
134  				case t.fastStats <- stats:
135  				default:
136  					fmt.Println("couldn't write to output channel")
137  				}
138  
139  				place++
140  				if place >= len(intervals) {
141  					break
142  				}
143  				timer.Reset(intervals[place])
144  			}
145  		}
146  	}()
147  }
148  
149  func (t *SystemMonitor) runChecks(services []string) (map[string]dogeboxd.ProcStatus, error) {
150  	stats, err := getStatus(services)
151  	if err != nil {
152  		fmt.Println("error getting stats from systemd:", err)
153  		return stats, err
154  	}
155  	return stats, err
156  }
157  
158  func (t *SystemMonitor) updateServices(args []string) {
159  	t.services = args
160  }
161  
162  func getStatus(serviceNames []string) (map[string]dogeboxd.ProcStatus, error) {
163  	conn, err := dbus.NewWithContext(context.Background())
164  	if err != nil {
165  		panic(err)
166  	}
167  	defer conn.Close()
168  
169  	out := map[string]dogeboxd.ProcStatus{}
170  	for _, service := range serviceNames {
171  		pidProp, err := conn.GetServicePropertyContext(context.Background(), service, "MainPID")
172  		if err != nil {
173  			continue
174  		}
175  		pid := pidProp.Value.Value().(uint32)
176  		cpu := float64(0)
177  		mem := float64(0)
178  		rssM := float64(0)
179  		running := false
180  
181  		proc, err := process.NewProcess(int32(pid))
182  		if err == nil {
183  			running = true
184  
185  			c, err := proc.CPUPercent()
186  			if err == nil {
187  				cpu = c
188  			}
189  
190  			m, err := proc.MemoryPercent()
191  			if err == nil {
192  				mem = float64(m)
193  			}
194  
195  			memInfo, err := proc.MemoryInfo()
196  			if err == nil {
197  				rssM = float64(memInfo.RSS) / float64(1048576)
198  			}
199  
200  		}
201  
202  		out[service] = dogeboxd.ProcStatus{
203  			CPUPercent: cpu,
204  			MEMPercent: mem,
205  			MEMMb:      rssM,
206  			Running:    running,
207  		}
208  	}
209  
210  	return out, err
211  }
212  
213  func (t SystemMonitor) GetMonChannel() chan []string {
214  	return t.mon
215  }
216  
217  func (t SystemMonitor) GetStatChannel() chan map[string]dogeboxd.ProcStatus {
218  	return t.stats
219  }
220  
221  func (t SystemMonitor) GetFastMonChannel() chan string {
222  	return t.fastMon
223  }
224  
225  func (t SystemMonitor) GetFastStatChannel() chan map[string]dogeboxd.ProcStatus {
226  	return t.fastStats
227  }