/ pkg / web / websocket_updates.go
websocket_updates.go
  1  package web
  2  
  3  import (
  4  	"context"
  5  	"fmt"
  6  	"log"
  7  	"time"
  8  
  9  	dogeboxd "github.com/dogeorg/dogeboxd/pkg"
 10  	"golang.org/x/net/websocket"
 11  )
 12  
 13  type WSRelay struct {
 14  	config dogeboxd.ServerConfig
 15  	socks  []*WSCONN
 16  	relay  chan dogeboxd.Change
 17  	newWs  chan *WSCONN
 18  }
 19  
 20  func NewWSRelay(config dogeboxd.ServerConfig, relay chan dogeboxd.Change) WSRelay {
 21  	if config.Recovery {
 22  		log.Printf("In recovery mode: not initialising WSRelay")
 23  		return WSRelay{}
 24  	}
 25  
 26  	return WSRelay{
 27  		config: config,
 28  		socks:  []*WSCONN{},        // all current connections
 29  		relay:  relay,              // recieve Change messages from Dogeboxd to broadcast
 30  		newWs:  make(chan *WSCONN), // recieve new WSCONNs
 31  	}
 32  }
 33  
 34  func (t WSRelay) Run(started, stopped chan bool, stop chan context.Context) error {
 35  	cleanupTime := 10 * time.Second
 36  	cleanup := time.NewTimer(cleanupTime)
 37  	go func() {
 38  		go func() {
 39  		mainloop:
 40  			for {
 41  				select {
 42  				case <-stop:
 43  					break mainloop
 44  				case ws := <-t.newWs:
 45  					t.addSock(ws)
 46  				case v := <-t.relay:
 47  					t.broadcast(v)
 48  				case <-cleanup.C:
 49  					t.cleanupSocks()
 50  					cleanup.Reset(cleanupTime)
 51  				}
 52  			}
 53  		}()
 54  
 55  		started <- true
 56  		<-stop
 57  		for _, sock := range t.socks {
 58  			sock.Close()
 59  		}
 60  		stopped <- true
 61  	}()
 62  	return nil
 63  }
 64  
 65  func (t *WSRelay) cleanupSocks() {
 66  	remaining := []*WSCONN{}
 67  	for _, s := range t.socks {
 68  		if s.IsClosed() {
 69  			continue
 70  		}
 71  		remaining = append(remaining, s)
 72  	}
 73  	t.socks = remaining
 74  }
 75  
 76  func (t *WSRelay) broadcast(v any) {
 77  	for _, ws := range t.socks {
 78  		if ws.IsClosed() {
 79  			continue
 80  		}
 81  		err := websocket.JSON.Send(ws.WS, v)
 82  		if err != nil {
 83  			ws.Close()
 84  		}
 85  	}
 86  }
 87  
 88  func (t *WSRelay) addSock(ws *WSCONN) {
 89  	t.socks = append(t.socks, ws)
 90  }
 91  
 92  func (t WSRelay) GetWSHandler(initialPayloader func() any) *websocket.Server {
 93  	config := &websocket.Config{
 94  		Origin: nil,
 95  	}
 96  	h := websocket.Server{
 97  		Handler: func(ws *websocket.Conn) {
 98  			stop := make(chan bool)
 99  			t.newWs <- &WSCONN{ws, stop}
100  
101  			err := websocket.JSON.Send(ws, initialPayloader())
102  			if err != nil {
103  				fmt.Println("failed to send initial payload", err)
104  			}
105  			<-stop // hold the connection until stopper closes
106  		},
107  		Config: *config,
108  	}
109  	return &h
110  }