zhangzengfei
2023-10-20 cc6d3763795b62d353df0c1eb58cbb736b709829
system-service/serf/handler.go
@@ -1,6 +1,7 @@
package serf
import (
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
@@ -16,6 +17,7 @@
   "time"
   "vamicro/config"
   "vamicro/system-service/bhome_msg_dev"
   "vamicro/system-service/models"
)
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
@@ -240,8 +242,45 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
      leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{leaveSql})
      // 判断离开是否是主节点离开, 更换主节点
      var clusterDb models.Node
      clusterNodes, err := clusterDb.FindNodes()
      if err == nil {
         for _, node := range clusterNodes {
            //logger.Info(node.Id, node.DriftState, leaveMember.Name)
            if node.NodeId == leaveMember.Name && node.DriftState == "master" {
               firstNode, _ := clusterDb.FindFirstNode()
               //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
               if firstNode.NodeId == Agent.LocalMember().Name {
                  //logger.Info("update master")
                  clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
                  // 通知主节点变更
                  chMsg := protomsg.DbChangeMessage{
                     Id:     firstNode.ClusterId,
                     Table:  protomsg.TableChanged_T_Cluster,
                     Action: protomsg.DbAction_Insert,
                     Info:   "slave2master",
                  }
                  bts, _ := json.Marshal(chMsg)
                  h := GetBusHandle()
                  if h == nil {
                     logger.Error("HandleEventMemberLeave bus handle is nil")
                     return
                  }
                  err := h.Publish("system-service", bts)
                  if err != nil {
                     logger.Error("HandleEventMemberLeave pub master err:", err)
                  }
               }
            }
         }
      }
      logger.Info("EventMemberLeave,current Members:", ev.Members)
      logLT := ""
@@ -267,7 +306,7 @@
      fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
      //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
      joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
      flag, e := executeSqlByGorm([]string{joinSql})
      logger.Info("EventMemberJoin,current Members:", ev.Members)