chenshijun
2020-01-15 4d6bafff86a6863450622c96f00661b9eb686b90
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package syncdb
 
import (
    "encoding/json"
    "fmt"
    "time"
 
    "github.com/hashicorp/memberlist"
)
 
var (
    members = make(map[string]NodeInfo,0)
    delay   time.Duration
)
 
// delegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist.
type delegate struct{}
 
// NodeMeta is the delegate method, must implement.
func (d *delegate) NodeMeta(limit int) []byte {
    return []byte{}
}
 
// LocalState is the delegate method, must implement.
func (d *delegate) LocalState(join bool) []byte {
    return []byte{}
}
 
// MergeRemoteState is the delegate method, must implement.
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
}
 
// GetBroadcasts is the delegate method, must implement.
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
    return [][]byte{}
}
 
// eventDelegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist.
type eventDelegate struct{}
 
// NotifyJoin is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyJoin(node *memberlist.Node) {
}
 
// NotifyLeave is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyLeave(node *memberlist.Node) {
}
 
// NotifyUpdate is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) {
}
 
// NotifyMsg is called when a user-data message is received
func (d *delegate) NotifyMsg(b []byte) {
    // logMsg(b)
    n := NodeInfo{}
    if err:= json.Unmarshal(b, &n);err ==nil {
        members[n.NodeID] = n
    } else {
        fmt.Println("NotifyMsg msg unmarshal err")
    }
}
 
func logMsg(b []byte) {
    type nodeInfo struct {
        NodeName string `json:"name"`
        Address  string `json:"address"`
    }
 
    node := nodeInfo{}
    if err := json.Unmarshal(b, &node); err != nil {
 
        fmt.Println("Umarshal failed:", err)
        return
    }
 
    fmt.Println(node)
}
 
func CreateSearchNode(key string) (*memberlist.Memberlist, error) {
    conf := memberlist.DefaultLocalConfig()
    conf.Events = &eventDelegate{}
    conf.Delegate = &delegate{}
    conf.BindAddr = BroadcastIP
    conf.BindPort = BroadcastPort
    conf.Name = "Cluster-Searcher"
 
    keyring, err := memberlist.NewKeyring(nil, []byte(key))
    if err != nil {
        fmt.Printf("Failed to restore keyring: %s", err)
        return nil, err
    }
    conf.Keyring = keyring
 
    return memberlist.Create(conf)
}
 
func CreateSearchNodeWhitClose(key string, delay time.Duration) map[string]NodeInfo {
    m, err := CreateSearchNode(key)
    if err == nil {
        // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
        time.Sleep(delay)
 
        m.Shutdown()
    }
    return members
}
 
func CloseSearchNode(m *memberlist.Memberlist) error {
    return m.Shutdown()
}
 
func GetSearchNodes() map[string]NodeInfo {
    return members
}