zhangzengfei
2023-10-20 cc6d3763795b62d353df0c1eb58cbb736b709829
system-service/serf/handler.go
@@ -1,9 +1,11 @@
package serf
import (
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "fmt"
   "github.com/golang/protobuf/proto"
   "github.com/hashicorp/memberlist"
   "github.com/satori/go.uuid"
@@ -15,6 +17,7 @@
   "time"
   "vamicro/config"
   "vamicro/system-service/bhome_msg_dev"
   "vamicro/system-service/models"
)
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
@@ -77,8 +80,25 @@
   logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
   SyncVirtualIpChan <- ev.Payload
}
func HandleUserEventSyncMessage(ev serf.UserEvent) {
   logger.Info("receive a UserEventSyncMessage event")
   var procMsg ProcMessageEvent
   err := json.Unmarshal(ev.Payload, &procMsg)
   if err != nil {
      logger.Error("sqlUe unmarshal err:", err)
      return
   }
//收到其它节点主动将注册中心的所有topic通知到集群中
   // 自己发送的消息不处理
   if procMsg.Owner != config.Server.AnalyServerId {
      // 判断是否有指定的接收目标
      if procMsg.Target == "" || procMsg.Target == config.Server.AnalyServerId {
         SyncProcMessageChan <- ev.Payload
      }
   }
}
// 收到其它节点主动将注册中心的所有topic通知到集群中
func HandleSyncRegisterInfo(ev serf.UserEvent) {
   logger.Debug("HandleSyncRegisterInfo")
   var si bhome_msg_dev.MsgDevRegisterInfo
@@ -167,7 +187,7 @@
   }
}
//处理其他的一些query请求
// 处理其他的一些query请求
func HandleOtherQuery(ev *serf.Query) {
   var reqBody RequestSerfTopicMsg
   var ret []byte
@@ -222,8 +242,45 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{leaveSql})
      // 判断离开是否是主节点离开, 更换主节点
      var clusterDb models.Node
      clusterNodes, err := clusterDb.FindNodes()
      if err == nil {
         for _, node := range clusterNodes {
            //logger.Info(node.Id, node.DriftState, leaveMember.Name)
            if node.NodeId == leaveMember.Name && node.DriftState == "master" {
               firstNode, _ := clusterDb.FindFirstNode()
               //logger.Info("firstNode:", firstNode.NodeId, "local:", Agent.LocalMember().Name)
               if firstNode.NodeId == Agent.LocalMember().Name {
                  //logger.Info("update master")
                  clusterDb.UpdateDriftStateByNodeId("master", firstNode.NodeId)
                  // 通知主节点变更
                  chMsg := protomsg.DbChangeMessage{
                     Id:     firstNode.ClusterId,
                     Table:  protomsg.TableChanged_T_Cluster,
                     Action: protomsg.DbAction_Insert,
                     Info:   "slave2master",
                  }
                  bts, _ := json.Marshal(chMsg)
                  h := GetBusHandle()
                  if h == nil {
                     logger.Error("HandleEventMemberLeave bus handle is nil")
                     return
                  }
                  err := h.Publish("system-service", bts)
                  if err != nil {
                     logger.Error("HandleEventMemberLeave pub master err:", err)
                  }
               }
            }
         }
      }
      logger.Info("EventMemberLeave,current Members:", ev.Members)
      logLT := ""
@@ -243,8 +300,13 @@
func HandleEventMemberJoin(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      joinMember := ev.Members[0]
      timeUnix := time.Now().Unix()
      fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
      //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s' and isDelete=1", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
      flag, e := executeSqlByGorm([]string{joinSql})
      logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -259,6 +321,6 @@
      if e != nil {
         logErr = e.Error()
      }
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}