websocket_updates.go
1 package web 2 3 import ( 4 "context" 5 "fmt" 6 "log" 7 "time" 8 9 dogeboxd "github.com/dogeorg/dogeboxd/pkg" 10 "golang.org/x/net/websocket" 11 ) 12 13 type WSRelay struct { 14 config dogeboxd.ServerConfig 15 socks []*WSCONN 16 relay chan dogeboxd.Change 17 newWs chan *WSCONN 18 } 19 20 func NewWSRelay(config dogeboxd.ServerConfig, relay chan dogeboxd.Change) WSRelay { 21 if config.Recovery { 22 log.Printf("In recovery mode: not initialising WSRelay") 23 return WSRelay{} 24 } 25 26 return WSRelay{ 27 config: config, 28 socks: []*WSCONN{}, // all current connections 29 relay: relay, // recieve Change messages from Dogeboxd to broadcast 30 newWs: make(chan *WSCONN), // recieve new WSCONNs 31 } 32 } 33 34 func (t WSRelay) Run(started, stopped chan bool, stop chan context.Context) error { 35 cleanupTime := 10 * time.Second 36 cleanup := time.NewTimer(cleanupTime) 37 go func() { 38 go func() { 39 mainloop: 40 for { 41 select { 42 case <-stop: 43 break mainloop 44 case ws := <-t.newWs: 45 t.addSock(ws) 46 case v := <-t.relay: 47 t.broadcast(v) 48 case <-cleanup.C: 49 t.cleanupSocks() 50 cleanup.Reset(cleanupTime) 51 } 52 } 53 }() 54 55 started <- true 56 <-stop 57 for _, sock := range t.socks { 58 sock.Close() 59 } 60 stopped <- true 61 }() 62 return nil 63 } 64 65 func (t *WSRelay) cleanupSocks() { 66 remaining := []*WSCONN{} 67 for _, s := range t.socks { 68 if s.IsClosed() { 69 continue 70 } 71 remaining = append(remaining, s) 72 } 73 t.socks = remaining 74 } 75 76 func (t *WSRelay) broadcast(v any) { 77 for _, ws := range t.socks { 78 if ws.IsClosed() { 79 continue 80 } 81 err := websocket.JSON.Send(ws.WS, v) 82 if err != nil { 83 ws.Close() 84 } 85 } 86 } 87 88 func (t *WSRelay) addSock(ws *WSCONN) { 89 t.socks = append(t.socks, ws) 90 } 91 92 func (t WSRelay) GetWSHandler(initialPayloader func() any) *websocket.Server { 93 config := &websocket.Config{ 94 Origin: nil, 95 } 96 h := websocket.Server{ 97 Handler: func(ws *websocket.Conn) { 98 stop := make(chan bool) 99 t.newWs <- &WSCONN{ws, stop} 100 101 err := websocket.JSON.Send(ws, initialPayloader()) 102 if err != nil { 103 fmt.Println("failed to send initial payload", err) 104 } 105 <-stop // hold the connection until stopper closes 106 }, 107 Config: *config, 108 } 109 return &h 110 }