zhangzengfei
2023-11-28 3a706d3378aa3626501370352963883fd2783558
system-service/serf/sync.go
@@ -6,6 +6,7 @@
   "basic.com/valib/bhomeclient.git"
   "basic.com/valib/logger.git"
   "context"
   "encoding/json"
   "github.com/gogo/protobuf/proto"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/req"
@@ -51,9 +52,11 @@
      nodes, e := nodeE.FindNodesByClusterId(c.ClusterId)
      if e == nil && nodes != nil && len(nodes) > 0 {
         var nodeIps []string
         for _, n := range nodes {
         for idx, n := range nodes {
            if n.NodeId != config.Server.AnalyServerId {
               nodeIps = append(nodeIps, n.NodeIp)
            } else {
               nodeE = nodes[idx]
            }
         }
@@ -68,12 +71,40 @@
         Agent, err = sdb.Init(c.ClusterId, c.Password, config.Server.AnalyServerId, nodeIps, config.ClusterSet.SerfSnapShotPath, conf)
         if Agent != nil {
            Agent.RegisterHandleEventFunc(HandleSerfEvent)
            logger.Debugf("local node:", nodeE)
            if nodeE.DriftState == "slave" {
               chMsg := protomsg.DbChangeMessage{
                  Id:     nodeE.ClusterId,
                  Table:  protomsg.TableChanged_T_Cluster,
                  Action: protomsg.DbAction_Insert,
                  Info:   "master2slave",
               }
               b, _ := json.Marshal(chMsg)
               err = hms.Publish(bhomeclient.Proc_System_Service, b)
               if err != nil {
                  logger.Error(err.Error())
               }
            }
            logger.Debug("sync.Agent init success!")
         } else {
            logger.Debug("sync.Agent init fail!")
         }
      }
   }
   go func() {
      for {
         select {
         case <-ctx.Done():
            return
         default:
            HandleUpdateMemberStatus()
            time.Sleep(5 * time.Second)
         }
      }
   }()
   go func() {
      for {
         select {
@@ -112,6 +143,30 @@
         select {
         case <-ctx.Done():
            return
         case b := <-SyncProcMessageChan:
            {
               var procMsg ProcMessageEvent
               err := json.Unmarshal(b, &procMsg)
               if err != nil {
                  logger.Error("Unmarshal ProcMessageEvent ", err.Error())
               } else {
                  err = hms.Publish(procMsg.Topic, procMsg.Payload)
                  if err != nil {
                     logger.Error("hms.Publish error ", err.Error())
                  }
               }
            }
         default:
            time.Sleep(50 * time.Millisecond)
         }
      }
   }()
   go func() {
      for {
         select {
         case <-ctx.Done():
            return
         case b := <-syncSdkCompareCacheChan:
            {
               logger.Debug("SyncSdkCompareCache in,len(b):", len(b))