package serf
|
|
import (
|
"basic.com/syncdb.git"
|
"basic.com/valib/logger.git"
|
"basic.com/valib/serf.git/client"
|
"strconv"
|
"time"
|
)
|
|
type RpcParam struct {
|
Name string
|
Timeout time.Duration
|
FilterNodes []string
|
FilterTags []string
|
Data []byte
|
}
|
|
// RpcParamTopic,放在RpcParam的Data中
|
type RpcParamTopic struct {
|
Topic string `json:"topic"`
|
Data []byte `json:"data"`
|
}
|
|
// RpcQuery
|
func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) {
|
rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort)
|
c, err := client.NewRPCClient(rpcAddr)
|
if err != nil {
|
return nil, err
|
}
|
defer c.Close()
|
ackCh := make(chan string)
|
respCh := make(chan client.NodeResponse)
|
|
param := client.QueryParam {
|
Name: req.Name,
|
FilterNodes: req.FilterNodes,
|
Payload: req.Data,
|
Timeout: req.Timeout,
|
AckCh: ackCh,
|
RespCh: respCh,
|
}
|
|
if err = c.Query(¶m); err != nil {
|
return nil, err
|
}
|
members, err := c.Members()
|
logger.Debug("len(members):", len(members), " err:", err)
|
|
//不确定返回的响应数量,以及响应时间
|
respArr := make([]client.NodeResponse, 0)
|
loop:
|
for {
|
select {
|
case <-time.After(req.Timeout):
|
break loop
|
case r := <-respCh:
|
respArr = append(respArr, r)
|
logger.Debug("response :", string(r.Payload), " from:", r.From)
|
default:
|
time.Sleep(100 * time.Millisecond)
|
}
|
}
|
return respArr, nil
|
}
|
|
// RpcUserEvent
|
func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error {
|
rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort)
|
c, err := client.NewRPCClient(rpcAddr)
|
if err != nil {
|
return err
|
}
|
defer c.Close()
|
|
err = c.UserEvent(name, payload, coalesce)
|
return err
|
}
|