zhangzengfei
2023-09-04 e8e536d1cb52d2126c8c7ce2ba1c7a76f7208678
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package service
 
import (
    "basic.com/syncdb.git"
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/client"
    "basic.com/valib/serf.git/cmd/serf/command/agent"
    "context"
    "github.com/mitchellh/cli"
    "vamicro/config"
    "vamicro/extend/util"
)
 
//serf daemon进程,集群中只有主节点才会启动,非主节点处于停止状态。
func Start(ctx context.Context) error {
    clusterId := ""
    password := config.ClusterSet.PwdPre+"123456"
    var addrs []string
 
    conf := syncdb.DefaultConfig()
    conf.RPCAddr = "0.0.0.0"
    conf.RPCPort = 7474
    conf.BindAddr = "0.0.0.0:30290"
    conf.Ctx = ctx
    agent, err := syncdb.Init(clusterId, password, config.Server.AnalyServerId, addrs, "./serf-daemon.json", conf)
    if err != nil {
        return err
    }
    agent.RegisterHandleEventFunc(HandleSerfEvent)
    logger.Debug("agent:", agent, "err:", err)
    return nil
}
 
func TestSerfRPC() {
    shutdownCh := make(chan struct{})
    doneCh := make(chan struct{})
    defer func() {
        close(shutdownCh)
        <-doneCh
    }()
 
    ui := new(cli.MockUi)
    c := &agent.Command{
        ShutdownCh: shutdownCh,
        Ui: ui,
    }
 
    ip, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
    rpcAddr := ip + ":7373"
    args := []string {
        "-bind", ip,
        "-rpc-addr", rpcAddr,
    }
 
    go func() {
        code := c.Run(args)
        if code != 0 {
            logger.Debug("run serf Command code:", code)
        }
        close(doneCh)
    }()
    rpcClient, err := client.NewRPCClient(rpcAddr)
    if err != nil {
        logger.Debug("client.NewRPCClient err:", err)
        return
    }
    members, err := rpcClient.Members()
    if err != nil {
        logger.Debug("rpc members err:", err)
    }
    logger.Debug("len(members):", len(members))
    param := client.QueryParam{
        FilterNodes: []string{},
        Name: "",
        Payload: []byte(""),
    }
    rpcClient.Query(&param)
}