package serf import ( "basic.com/syncdb.git" "basic.com/valib/logger.git" "basic.com/valib/serf.git/client" "strconv" "time" ) type RpcParam struct { Name string Timeout time.Duration FilterNodes []string FilterTags []string Data []byte } // RpcParamTopic,放在RpcParam的Data中 type RpcParamTopic struct { Topic string `json:"topic"` Data []byte `json:"data"` } // RpcQuery func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) { rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort) c, err := client.NewRPCClient(rpcAddr) if err != nil { return nil, err } defer c.Close() ackCh := make(chan string) respCh := make(chan client.NodeResponse) param := client.QueryParam { Name: req.Name, FilterNodes: req.FilterNodes, Payload: req.Data, Timeout: req.Timeout, AckCh: ackCh, RespCh: respCh, } if err = c.Query(¶m); err != nil { return nil, err } members, err := c.Members() logger.Debug("len(members):", len(members), " err:", err) //不确定返回的响应数量,以及响应时间 respArr := make([]client.NodeResponse, 0) loop: for { select { case <-time.After(req.Timeout): break loop case r := <-respCh: respArr = append(respArr, r) logger.Debug("response :", string(r.Payload), " from:", r.From) default: time.Sleep(100 * time.Millisecond) } } return respArr, nil } // RpcUserEvent func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error { rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort) c, err := client.NewRPCClient(rpcAddr) if err != nil { return err } defer c.Close() err = c.UserEvent(name, payload, coalesce) return err }