package command import ( "flag" "fmt" "io/ioutil" "strings" "time" sdb "basic.com/pubsub/syncdb.git/db" "github.com/hashicorp/serf/client" "github.com/hashicorp/serf/cmd/serf/command/agent" "github.com/mitchellh/cli" ) // QueryCommand is a Command implementation that is used to trigger a new // query and wait for responses and acks type QueryCommand struct { ShutdownCh <-chan struct{} Ui cli.Ui } var _ cli.Command = &QueryCommand{} func (c *QueryCommand) Help() string { helpText := ` Usage: serf query [options] name payload Dispatches a query to the Serf cluster. Options: -format If provided, output is returned in the specified format. Valid formats are 'json', and 'text' (default) -node=NAME This flag can be provided multiple times to filter responses to only named nodes. -tag key=regexp This flag can be provided multiple times to filter responses to only nodes matching the tags. -timeout="15s" Providing a timeout overrides the default timeout. -no-ack Setting this prevents nodes from sending an acknowledgement of the query. -relay-factor If provided, query responses will be relayed through this number of extra nodes for redundancy. -rpc-addr=127.0.0.1:7373 RPC address of the Serf agent. -rpc-auth="" RPC auth token of the Serf agent. ` return strings.TrimSpace(helpText) } func (c *QueryCommand) Run(args []string) int { var noAck bool var nodes []string var tags []string var timeout time.Duration var format string var relayFactor int cmdFlags := flag.NewFlagSet("event", flag.ContinueOnError) cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Var((*agent.AppendSliceValue)(&nodes), "node", "node filter") cmdFlags.Var((*agent.AppendSliceValue)(&tags), "tag", "tag filter") cmdFlags.DurationVar(&timeout, "timeout", 0, "query timeout") cmdFlags.BoolVar(&noAck, "no-ack", false, "no-ack") cmdFlags.StringVar(&format, "format", "text", "output format") cmdFlags.IntVar(&relayFactor, "relay-factor", 0, "response relay count") rpcAddr := RPCAddrFlag(cmdFlags) rpcAuth := RPCAuthFlag(cmdFlags) if err := cmdFlags.Parse(args); err != nil { return 1 } // Setup the filter tags filterTags, err := agent.UnmarshalTags(tags) if err != nil { c.Ui.Error(fmt.Sprintf("Error: %s", err)) return 1 } args = cmdFlags.Args() if len(args) < 1 { c.Ui.Error("A query name must be specified.") c.Ui.Error("") c.Ui.Error(c.Help()) return 1 } else if len(args) > 2 { c.Ui.Error("Too many command line arguments. Only a name and payload must be specified.") c.Ui.Error("") c.Ui.Error(c.Help()) return 1 } if relayFactor > 255 || relayFactor < 0 { c.Ui.Error("Relay factor must be between 0 and 255") return 1 } name := args[0] var payload []byte if len(args) == 2 { payload = []byte(args[1]) } cl, err := RPCClient(*rpcAddr, *rpcAuth) if err != nil { c.Ui.Error(fmt.Sprintf("Error connecting to Serf agent: %s", err)) return 1 } defer cl.Close() // Setup the the response handler var handler queryRespFormat switch format { case "text": handler = &textQueryRespFormat{ ui: c.Ui, name: name, noAck: noAck, } case "json": handler = &jsonQueryRespFormat{ ui: c.Ui, Responses: make(map[string]string), } default: c.Ui.Error(fmt.Sprintf("Invalid format: %s", format)) return 1 } ackCh := make(chan string, 128) respCh := make(chan client.NodeResponse, 128) params := client.QueryParam{ FilterNodes: nodes, FilterTags: filterTags, RequestAck: !noAck, RelayFactor: uint8(relayFactor), Timeout: timeout, Name: name, Payload: payload, AckCh: ackCh, RespCh: respCh, } if err := cl.Query(¶ms); err != nil { c.Ui.Error(fmt.Sprintf("Error sending query: %s", err)) return 1 } handler.Started() OUTER: for { select { case a := <-ackCh: if a == "" { break OUTER } handler.AckReceived(a) case r := <-respCh: fmt.Println("enter respch: r.payload", len(r.Payload)) if r.From == "" { break OUTER } handler.ResponseReceived(r) case <-c.ShutdownCh: return 1 } } if err := handler.Finished(); err != nil { return 1 } return 0 } func (c *QueryCommand) Synopsis() string { return "Send a query to the Serf cluster" } // queryRespFormat is used to switch our handler based on the format type queryRespFormat interface { Started() AckReceived(from string) ResponseReceived(resp client.NodeResponse) Finished() error } // textQueryRespFormat is used to output the results in a human-readable // format that is streamed as results come in type textQueryRespFormat struct { ui cli.Ui name string noAck bool numAcks int numResp int } func (t *textQueryRespFormat) Started() { t.ui.Output(fmt.Sprintf("Query '%s' dispatched", t.name)) } func (t *textQueryRespFormat) AckReceived(from string) { t.numAcks++ t.ui.Info(fmt.Sprintf("Ack from '%s'", from)) } func (t *textQueryRespFormat) ResponseReceived(r client.NodeResponse) { t.numResp++ // Remove the trailing newline if there is one payload := r.Payload if n := len(payload); n > 0 && payload[n-1] == '\n' { payload = payload[:n-1] } /** my self example*/ sdb.Dbconn.Close() sdb.Dbconn = nil err := ioutil.WriteFile("test.db", payload, 0644) if err != nil { fmt.Println("query byte to file error!", err) } err = sdb.GetConn() if err != nil { fmt.Println("create db conn of test.db error: ", err) } /* my self example*/ fmt.Printf("Response from %s: %d\n", r.From, len(payload)) } func (t *textQueryRespFormat) Finished() error { if !t.noAck { t.ui.Output(fmt.Sprintf("Total Acks: %d", t.numAcks)) } t.ui.Output(fmt.Sprintf("Total Responses: %d", t.numResp)) return nil } // jsonQueryRespFormat is used to output the results in a JSON format type jsonQueryRespFormat struct { ui cli.Ui Acks []string Responses map[string]string } func (j *jsonQueryRespFormat) Started() {} func (j *jsonQueryRespFormat) AckReceived(from string) { j.Acks = append(j.Acks, from) } func (j *jsonQueryRespFormat) ResponseReceived(r client.NodeResponse) { j.Responses[r.From] = string(r.Payload) } func (j *jsonQueryRespFormat) Finished() error { output, err := formatOutput(j, "json") if err != nil { j.ui.Error(fmt.Sprintf("Encoding error: %s", err)) return err } j.ui.Output(string(output)) return nil }