基于serf的数据库同步模块库
zhangzengfei
2022-05-24 4bbe900cd12b5a27280ac5e39b40980598d992b1
searcher.go
@@ -2,14 +2,14 @@
import (
   "encoding/json"
   "fmt"
   "basic.com/valib/logger.git"
   "time"
   "github.com/hashicorp/memberlist"
)
var (
   members [][]byte
   members = make(map[string]NodeInfo,0)
   delay   time.Duration
)
@@ -53,8 +53,12 @@
// NotifyMsg is called when a user-data message is received
func (d *delegate) NotifyMsg(b []byte) {
   // logMsg(b)
   fmt.Println(b)
   members = append(members, b)
   n := NodeInfo{}
   if err:= json.Unmarshal(b, &n);err ==nil {
      members[n.NodeID] = n
   } else {
      logger.Error("NotifyMsg msg unmarshal err")
   }
}
func logMsg(b []byte) {
@@ -66,14 +70,15 @@
   node := nodeInfo{}
   if err := json.Unmarshal(b, &node); err != nil {
      fmt.Println("Umarshal failed:", err)
      logger.Error("Umarshal failed:", err)
      return
   }
   fmt.Println(node)
   logger.Info(node)
}
func CreateSearchNode(key string) (*memberlist.Memberlist, error) {
   members = make(map[string]NodeInfo,0)
   conf := memberlist.DefaultLocalConfig()
   conf.Events = &eventDelegate{}
   conf.Delegate = &delegate{}
@@ -83,7 +88,7 @@
   keyring, err := memberlist.NewKeyring(nil, []byte(key))
   if err != nil {
      fmt.Printf("Failed to restore keyring: %s", err)
      logger.Error("Failed to restore keyring: %s", err)
      return nil, err
   }
   conf.Keyring = keyring
@@ -91,10 +96,10 @@
   return memberlist.Create(conf)
}
func CreateSearchNodeWhitClose(key string, delay time.Duration) [][]byte {
func CreateSearchNodeWhitClose(key string, delay time.Duration) map[string]NodeInfo {
   m, err := CreateSearchNode(key)
   if err == nil {
      // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
      //logger.Info("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
      time.Sleep(delay)
      m.Shutdown()
@@ -106,6 +111,6 @@
   return m.Shutdown()
}
func GetSearchNodes() [][]byte {
func GetSearchNodes() map[string]NodeInfo {
   return members
}