| | |
| | | "basic.com/valib/bhomeclient.git" |
| | | "basic.com/valib/logger.git" |
| | | "context" |
| | | "encoding/json" |
| | | "github.com/gogo/protobuf/proto" |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/req" |
| | |
| | | nodes, e := nodeE.FindNodesByClusterId(c.ClusterId) |
| | | if e == nil && nodes != nil && len(nodes) > 0 { |
| | | var nodeIps []string |
| | | for _, n := range nodes { |
| | | for idx, n := range nodes { |
| | | if n.NodeId != config.Server.AnalyServerId { |
| | | nodeIps = append(nodeIps, n.NodeIp) |
| | | } else { |
| | | nodeE = nodes[idx] |
| | | } |
| | | } |
| | | |
| | |
| | | Agent, err = sdb.Init(c.ClusterId, c.Password, config.Server.AnalyServerId, nodeIps, config.ClusterSet.SerfSnapShotPath, conf) |
| | | if Agent != nil { |
| | | Agent.RegisterHandleEventFunc(HandleSerfEvent) |
| | | |
| | | logger.Debugf("local node:", nodeE) |
| | | if nodeE.DriftState == "slave" { |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: nodeE.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "master2slave", |
| | | } |
| | | b, _ := json.Marshal(chMsg) |
| | | err = hms.Publish(bhomeclient.Proc_System_Service, b) |
| | | if err != nil { |
| | | logger.Error(err.Error()) |
| | | } |
| | | } |
| | | logger.Debug("sync.Agent init success!") |
| | | } else { |
| | | logger.Debug("sync.Agent init fail!") |
| | | } |
| | | } |
| | | } |
| | | |
| | | go func() { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | HandleUpdateMemberStatus() |
| | | time.Sleep(5 * time.Second) |
| | | } |
| | | } |
| | | }() |
| | | |
| | | go func() { |
| | | for { |
| | | select { |
| | |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | case b := <-SyncProcMessageChan: |
| | | { |
| | | var procMsg ProcMessageEvent |
| | | err := json.Unmarshal(b, &procMsg) |
| | | if err != nil { |
| | | logger.Error("Unmarshal ProcMessageEvent ", err.Error()) |
| | | } else { |
| | | err = hms.Publish(procMsg.Topic, procMsg.Payload) |
| | | if err != nil { |
| | | logger.Error("hms.Publish error ", err.Error()) |
| | | } |
| | | } |
| | | |
| | | } |
| | | default: |
| | | time.Sleep(50 * time.Millisecond) |
| | | } |
| | | } |
| | | }() |
| | | go func() { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | case b := <-syncSdkCompareCacheChan: |
| | | { |
| | | logger.Debug("SyncSdkCompareCache in,len(b):", len(b)) |