/ event-handler.go
event-handler.go
 1  package main
 2  
 3  import (
 4  	"bytes"
 5  
 6  	"encoding/json"
 7  	"net/http"
 8  	"os/exec"
 9  
10  	"github.com/AcalephStorage/consul-alerts/consul"
11  
12  	log "github.com/AcalephStorage/consul-alerts/Godeps/_workspace/src/github.com/Sirupsen/logrus"
13  )
14  
15  type EventProcessor struct {
16  	inChan    chan []consul.Event
17  	closeChan chan struct{}
18  	firstRun  bool
19  }
20  
21  func (ep *EventProcessor) start() {
22  	cleanup := false
23  	for !cleanup {
24  		select {
25  		case events := <-ep.inChan:
26  			ep.handleEvents(events)
27  		case <-ep.closeChan:
28  			cleanup = true
29  		}
30  	}
31  }
32  
33  func (ep *EventProcessor) stop() {
34  	close(ep.closeChan)
35  }
36  
37  func (ep *EventProcessor) handleEvents(events []consul.Event) {
38  	for _, event := range events {
39  		log.Println("----------------------------------------")
40  		log.Printf("Processing event %s:\n", event.ID)
41  		log.Println("----------------------------------------")
42  		eventHandlers := consulClient.EventHandlers(event.Name)
43  		for _, eventHandler := range eventHandlers {
44  			data, err := json.Marshal(&event)
45  			if err != nil {
46  				log.Println("Unable to read event: ", event)
47  				// then what?
48  			}
49  
50  			input := bytes.NewReader(data)
51  			output := new(bytes.Buffer)
52  			cmd := exec.Command(eventHandler)
53  			cmd.Stdin = input
54  			cmd.Stdout = output
55  			cmd.Stderr = output
56  
57  			if err := cmd.Run(); err != nil {
58  				log.Println("error running handler: ", err)
59  			} else {
60  				log.Printf(">>> \n%s -> %s:\n %s\n", event.ID, eventHandler, output)
61  			}
62  
63  		}
64  		log.Printf("Event Processed.\n\n")
65  	}
66  }
67  
68  func (ep *EventProcessor) eventHandler(w http.ResponseWriter, r *http.Request) {
69  	consulClient.LoadConfig()
70  	if ep.firstRun {
71  		log.Println("Now watching for events.")
72  		ep.firstRun = false
73  		// set status to OK
74  		return
75  	}
76  
77  	if !consulClient.EventsEnabled() {
78  		log.Println("Event handling disabled. Event ignored.")
79  		// set to OK?
80  		return
81  	}
82  
83  	var events []consul.Event
84  	toWatchObject(r.Body, &events)
85  	ep.inChan <- events
86  	// set status to OK
87  }
88  
89  func startEventProcessor() *EventProcessor {
90  	ep := &EventProcessor{
91  		inChan:    make(chan []consul.Event, 1),
92  		closeChan: make(chan struct{}),
93  		firstRun:  true,
94  	}
95  	go ep.start()
96  	return ep
97  }