zhangzengfei
2023-10-20 71b8885babe6dfd25c91b007018347c0c1bfac74
system-service/serf/handler.go
@@ -80,6 +80,44 @@
   logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
   SyncVirtualIpChan <- ev.Payload
}
func HandleUserEventChangeMaster(ev serf.UserEvent) {
   masterId := string(ev.Payload)
   localId := Agent.LocalMember().Name
   logger.Info("变为主节点的id,", masterId, "本节点id:", localId)
   if masterId == localId {
      return
   }
   var clusterDb models.Node
   localNode, err := clusterDb.FindNodeById(localId)
   if err == nil {
      if localNode.DriftState == "master" {
         logger.Info("本节点为之前的master,通知变更为slave")
         // 通知主节点变更
         chMsg := protomsg.DbChangeMessage{
            Id:     localNode.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "master2slave",
         }
         bts, _ := json.Marshal(chMsg)
         h := GetBusHandle()
         if h == nil {
            logger.Error("HandleEventMemberLeave bus handle is nil")
            return
         }
         err := h.Publish("system-service", bts)
         if err != nil {
            logger.Error("HandleEventMemberLeave pub master err:", err)
         }
      }
   }
   clusterDb.UpdateDriftStateByNodeId("master", masterId, false)
}
func HandleUserEventSyncMessage(ev serf.UserEvent) {
   logger.Info("receive a UserEventSyncMessage event")
   var procMsg ProcMessageEvent
@@ -242,47 +280,46 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      logger.Info("Event Member Leave, Members:", ev.Members)
      leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{leaveSql})
      // 判断离开是否是主节点离开, 更换主节点
      var clusterDb models.Node
      clusterNodes, err := clusterDb.FindNodes()
      leaveNode, err := clusterDb.FindNodeById(leaveMember.Name)
      if err == nil {
         for _, node := range clusterNodes {
            //logger.Info(node.Id, node.DriftState, leaveMember.Name)
            if node.NodeId == leaveMember.Name && node.DriftState == "master" {
               firstNode, _ := clusterDb.FindFirstNode()
               //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
               if firstNode.NodeId == Agent.LocalMember().Name {
                  //logger.Info("update master")
                  clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
         logger.Info("查询离开节点的信息,", leaveNode)
         if leaveNode.DriftState == "master" {
            firstNode, _ := clusterDb.FindFirstNode()
            logger.Info("离开的节点为主节点, 查询加入最早的节点, ", firstNode)
            logger.Info("本节点信息:", Agent.LocalMember().Name)
            if firstNode.NodeId == Agent.LocalMember().Name {
               logger.Info("更新本节点为主节点")
               clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId, true)
                  // 通知主节点变更
                  chMsg := protomsg.DbChangeMessage{
                     Id:     firstNode.ClusterId,
                     Table:  protomsg.TableChanged_T_Cluster,
                     Action: protomsg.DbAction_Insert,
                     Info:   "slave2master",
                  }
               // 通知主节点变更
               chMsg := protomsg.DbChangeMessage{
                  Id:     firstNode.ClusterId,
                  Table:  protomsg.TableChanged_T_Cluster,
                  Action: protomsg.DbAction_Insert,
                  Info:   "slave2master",
               }
                  bts, _ := json.Marshal(chMsg)
                  h := GetBusHandle()
                  if h == nil {
                     logger.Error("HandleEventMemberLeave bus handle is nil")
                     return
                  }
                  err := h.Publish("system-service", bts)
                  if err != nil {
                     logger.Error("HandleEventMemberLeave pub master err:", err)
                  }
               bts, _ := json.Marshal(chMsg)
               h := GetBusHandle()
               if h == nil {
                  logger.Error("HandleEventMemberLeave bus handle is nil")
                  return
               }
               err := h.Publish("system-service", bts)
               if err != nil {
                  logger.Error("HandleEventMemberLeave pub master err:", err)
               }
            }
         }
      }
      logger.Info("EventMemberLeave,current Members:", ev.Members)
      logLT := ""
      logT := time.Now().Format("2006-01-02 15:04:05")
      logSql := strings.ReplaceAll(leaveSql, "'", "''")
@@ -305,8 +342,35 @@
      timeUnix := time.Now().Unix()
      fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
      //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      // 更新在线状态
      updateOnlineSql := fmt.Sprintf("update cluster_node set online=1 where node_id='%s'", joinMember.Name)
      // 如果是新加入的更新创建时间
      joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
      flag, e := executeSqlByGorm([]string{updateOnlineSql, joinSql})
      logger.Info("EventMemberJoin,current Members:", ev.Members)
      logLT := ""
      logT := time.Now().Format("2006-01-02 15:04:05")
      logSql := strings.ReplaceAll(joinSql, "'", "''")
      logResult := "0"
      if flag {
         logResult = "1"
      }
      logErr := ""
      if e != nil {
         logErr = e.Error()
      }
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}
func HandleEventMemberFail(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      joinMember := ev.Members[0]
      //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      joinSql := fmt.Sprintf("update cluster_node set online=0 where node_id='%s'", joinMember.Name)
      flag, e := executeSqlByGorm([]string{joinSql})
      logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -324,3 +388,22 @@
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}
func HandleUpdateMemberStatus() {
   if Agent == nil {
      return
   }
   for _, member := range Agent.Serf().Members() {
      alive := 0
      if member.Status == serf.StatusAlive {
         alive = 1
      }
      sql := fmt.Sprintf("update cluster_node set online=%d where node_id='%s'", alive, member.Name)
      _, err := executeSqlByGorm([]string{sql})
      if err != nil {
         logger.Error(err)
      }
   }
}