package agent import ( "fmt" "log" "os" "strings" "sync" sdb "basic.com/pubsub/syncdb.git/db" "github.com/hashicorp/serf/serf" ) // EventHandler is a handler that does things when events happen. type EventHandler interface { HandleEvent(serf.Event) } // ScriptEventHandler invokes scripts for the events that it receives. type ScriptEventHandler struct { SelfFunc func() serf.Member Scripts []EventScript Logger *log.Logger scriptLock sync.Mutex newScripts []EventScript } func (h *ScriptEventHandler) HandleEvent(e serf.Event) { switch ev := e.(type) { case serf.MemberEvent: sdb.Dumpdb() // do staf case serf.UserEvent: fmt.Println(string(ev.Payload)) var tmpstringslice []string tmpstringslice = append(tmpstringslice, string(ev.Payload)) fmt.Println(tmpstringslice) results, err := sdb.DoExecute(tmpstringslice) for _, result := range results { fmt.Println(result, "results err: ", err) } case *serf.Query: //bak file and send resp filename, err := sdb.BakDbFile() if err != nil { fmt.Println("bak db file error!") return } fmt.Println(filename) filebuf, err := filetobytes(filename) fmt.Println("filebuf: ", len(filebuf)) if err != nil { fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { fmt.Printf("remove file%s\n failed", filename) return } fmt.Println("query payload: ", len(ev.Payload)) if query, ok := e.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { fmt.Printf("err: %s\n", err) return } } default: fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) } // Swap in the new scripts if any h.scriptLock.Lock() if h.newScripts != nil { h.Scripts = h.newScripts h.newScripts = nil } h.scriptLock.Unlock() if h.Logger == nil { h.Logger = log.New(os.Stderr, "", log.LstdFlags) } self := h.SelfFunc() for _, script := range h.Scripts { if !script.Invoke(e) { continue } err := invokeEventScript(h.Logger, script.Script, self, e) if err != nil { h.Logger.Printf("[ERR] agent: Error invoking script '%s': %s", script.Script, err) } } } // UpdateScripts is used to safely update the scripts we invoke in // a thread safe manner func (h *ScriptEventHandler) UpdateScripts(scripts []EventScript) { h.scriptLock.Lock() defer h.scriptLock.Unlock() h.newScripts = scripts } // EventFilter is used to filter which events are processed type EventFilter struct { Event string Name string } // Invoke tests whether or not this event script should be invoked // for the given Serf event. func (s *EventFilter) Invoke(e serf.Event) bool { if s.Event == "*" { return true } if e.EventType().String() != s.Event { return false } if s.Event == "user" && s.Name != "" { userE, ok := e.(serf.UserEvent) if !ok { return false } if userE.Name != s.Name { return false } } if s.Event == "query" && s.Name != "" { query, ok := e.(*serf.Query) if !ok { return false } if query.Name != s.Name { return false } } return true } // Valid checks if this is a valid agent event script. func (s *EventFilter) Valid() bool { switch s.Event { case "member-join": case "member-leave": case "member-failed": case "member-update": case "member-reap": case "user": case "query": case "*": default: return false } return true } // EventScript is a single event script that will be executed in the // case of an event, and is configured from the command-line or from // a configuration file. type EventScript struct { EventFilter Script string } func (s *EventScript) String() string { if s.Name != "" { return fmt.Sprintf("Event '%s:%s' invoking '%s'", s.Event, s.Name, s.Script) } return fmt.Sprintf("Event '%s' invoking '%s'", s.Event, s.Script) } // ParseEventScript takes a string in the format of "type=script" and // parses it into an EventScript struct, if it can. func ParseEventScript(v string) []EventScript { var filter, script string parts := strings.SplitN(v, "=", 2) if len(parts) == 1 { script = parts[0] } else { filter = parts[0] script = parts[1] } filters := ParseEventFilter(filter) results := make([]EventScript, 0, len(filters)) for _, filt := range filters { result := EventScript{ EventFilter: filt, Script: script, } results = append(results, result) } return results } // ParseEventFilter a string with the event type filters and // parses it into a series of EventFilters if it can. func ParseEventFilter(v string) []EventFilter { // No filter translates to stream all if v == "" { v = "*" } events := strings.Split(v, ",") results := make([]EventFilter, 0, len(events)) for _, event := range events { var result EventFilter var name string if strings.HasPrefix(event, "user:") { name = event[len("user:"):] event = "user" } else if strings.HasPrefix(event, "query:") { name = event[len("query:"):] event = "query" } result.Event = event result.Name = name results = append(results, result) } return results }