zhangzengfei
2023-09-04 e8e536d1cb52d2126c8c7ce2ba1c7a76f7208678
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package serf
 
import (
    "basic.com/syncdb.git"
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/client"
    "strconv"
    "time"
)
 
type RpcParam struct {
    Name                 string
    Timeout             time.Duration
    FilterNodes         []string
    FilterTags             []string
    Data                 []byte
}
 
// RpcParamTopic,放在RpcParam的Data中
type RpcParamTopic struct {
    Topic                 string     `json:"topic"`
    Data                  []byte    `json:"data"`
}
 
// RpcQuery
func RpcQuery(rpcIp string, req *RpcParam) ([]client.NodeResponse, error) {
    rpcAddr := rpcIp +":"+strconv.Itoa(syncdb.DefaultRPCPort)
    c, err := client.NewRPCClient(rpcAddr)
    if err != nil {
        return nil, err
    }
    defer c.Close()
    ackCh := make(chan string)
    respCh := make(chan client.NodeResponse)
 
    param := client.QueryParam {
        Name: req.Name,
        FilterNodes: req.FilterNodes,
        Payload: req.Data,
        Timeout: req.Timeout,
        AckCh: ackCh,
        RespCh: respCh,
    }
 
    if err = c.Query(&param); err != nil {
        return nil, err
    }
    members, err := c.Members()
    logger.Debug("len(members):", len(members), " err:", err)
 
    //不确定返回的响应数量,以及响应时间
    respArr := make([]client.NodeResponse, 0)
loop:
    for {
        select {
        case <-time.After(req.Timeout):
            break loop
        case r := <-respCh:
            respArr = append(respArr, r)
            logger.Debug("response :", string(r.Payload), " from:", r.From)
        default:
            time.Sleep(100 * time.Millisecond)
        }
    }
    return respArr, nil
}
 
// RpcUserEvent
func RpcUserEvent(rpcIp string, rpcPort int, name string, payload []byte, coalesce bool) error {
    rpcAddr := rpcIp +":"+strconv.Itoa(rpcPort)
    c, err := client.NewRPCClient(rpcAddr)
    if err != nil {
        return err
    }
    defer c.Close()
 
    err = c.UserEvent(name, payload, coalesce)
    return err
}