/ event-handler.go
event-handler.go
1 package main 2 3 import ( 4 "bytes" 5 6 "encoding/json" 7 "net/http" 8 "os/exec" 9 10 "github.com/AcalephStorage/consul-alerts/consul" 11 12 log "github.com/AcalephStorage/consul-alerts/Godeps/_workspace/src/github.com/Sirupsen/logrus" 13 ) 14 15 type EventProcessor struct { 16 inChan chan []consul.Event 17 closeChan chan struct{} 18 firstRun bool 19 } 20 21 func (ep *EventProcessor) start() { 22 cleanup := false 23 for !cleanup { 24 select { 25 case events := <-ep.inChan: 26 ep.handleEvents(events) 27 case <-ep.closeChan: 28 cleanup = true 29 } 30 } 31 } 32 33 func (ep *EventProcessor) stop() { 34 close(ep.closeChan) 35 } 36 37 func (ep *EventProcessor) handleEvents(events []consul.Event) { 38 for _, event := range events { 39 log.Println("----------------------------------------") 40 log.Printf("Processing event %s:\n", event.ID) 41 log.Println("----------------------------------------") 42 eventHandlers := consulClient.EventHandlers(event.Name) 43 for _, eventHandler := range eventHandlers { 44 data, err := json.Marshal(&event) 45 if err != nil { 46 log.Println("Unable to read event: ", event) 47 // then what? 48 } 49 50 input := bytes.NewReader(data) 51 output := new(bytes.Buffer) 52 cmd := exec.Command(eventHandler) 53 cmd.Stdin = input 54 cmd.Stdout = output 55 cmd.Stderr = output 56 57 if err := cmd.Run(); err != nil { 58 log.Println("error running handler: ", err) 59 } else { 60 log.Printf(">>> \n%s -> %s:\n %s\n", event.ID, eventHandler, output) 61 } 62 63 } 64 log.Printf("Event Processed.\n\n") 65 } 66 } 67 68 func (ep *EventProcessor) eventHandler(w http.ResponseWriter, r *http.Request) { 69 consulClient.LoadConfig() 70 if ep.firstRun { 71 log.Println("Now watching for events.") 72 ep.firstRun = false 73 // set status to OK 74 return 75 } 76 77 if !consulClient.EventsEnabled() { 78 log.Println("Event handling disabled. Event ignored.") 79 // set to OK? 80 return 81 } 82 83 var events []consul.Event 84 toWatchObject(r.Body, &events) 85 ep.inChan <- events 86 // set status to OK 87 } 88 89 func startEventProcessor() *EventProcessor { 90 ep := &EventProcessor{ 91 inChan: make(chan []consul.Event, 1), 92 closeChan: make(chan struct{}), 93 firstRun: true, 94 } 95 go ep.start() 96 return ep 97 }