zhangzengfei
2023-10-20 71b8885babe6dfd25c91b007018347c0c1bfac74
system-service/serf/serf.go
@@ -21,13 +21,18 @@
   UserEventSyncSql                = "SyncSql"
   UserEventSyncDbTablePersonCache = "SyncCache"
   UserEventSyncVirtualIp          = "SyncVirtualIp"              //漂移ip修改
   UserEventChangeMaster           = "ChangeMaster "              //修改节点状态
   UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //同步注册信息
   DataSystemSerfSubscribe         = "data-system-serf-subscribe" //各app从serf订阅消息
   UserEventSyncMessage            = "SyncMessageForProc"         // 为其他进程同步消息
   TcpTransportPort                = 30194                        //tcp传输大数据量接口
   SUserEventSyncMessage
)
var SyncDbTablePersonCacheChan = make(chan []byte, 512)
var SyncVirtualIpChan = make(chan []byte, 512)
var SyncProcMessageChan = make(chan []byte, 512)
func HandleSerfEvent(event serf.Event) {
   switch ev := event.(type) {
@@ -38,10 +43,15 @@
         HandleUserEventSyncDbTablePersonCache(ev)
      } else if ev.Name == UserEventSyncVirtualIp {
         HandleUserEventSyncVirtualIp(ev)
      } else if ev.Name == UserEventChangeMaster {
         HandleUserEventChangeMaster(ev)
      } else if ev.Name == UserEventSyncRegisterInfo {
         HandleSyncRegisterInfo(ev)
      } else if ev.Name == DataSystemSerfSubscribe {
         HandleDataSystemSerfSub(ev)
      } else if ev.Name == UserEventSyncMessage {
         logger.Debug("接收到SyncMessageForProc")
         HandleUserEventSyncMessage(ev)
      }
   case *serf.Query:
      if ev.Name == QueryEventUpdateDBData {
@@ -54,10 +64,12 @@
   case serf.MemberEvent:
      if event.EventType() == serf.EventMemberLeave {
         HandleEventMemberLeave(ev)
      } else if event.EventType() == serf.EventMemberJoin {
      } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate {
         HandleEventMemberJoin(ev)
      } else if event.EventType() == serf.EventMemberFailed {
         HandleEventMemberFail(ev)
      }
      logger.Error("serf MemberEvent ", event.EventType())
   default:
      logger.Warn("Unknown event type: %s\n", ev.EventType().String())
   }
@@ -83,11 +95,11 @@
               logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
               return false, err
            }
            if result.RowsAffected == 0 {
               logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
               err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
               return false, err
            }
            //if result.RowsAffected == 0 {
            //   logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
            //   err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
            //   return false, err
            //}
         }
         tx.Commit()
         return true, nil
@@ -100,6 +112,14 @@
type SqlUserEvent struct {
   Owner string   `json:"owner"`
   Sql   []string `json:"sql"`
}
type ProcMessageEvent struct {
   Owner   string `json:"owner"`    // 发送者
   Target  string `json:"target"`   // 指定接收者
   Proc    string `json:"procName"` // 进程名
   Topic   string `json:"topic"`    // 主题
   Payload []byte `json:"payload"`  // 消息体,自行解析
}
type TableDesc struct {
@@ -193,7 +213,7 @@
   mbs := a.GroupMembers(clusterId)
   var specmembername string
   for _, m := range mbs {
      logger.Info("m", m)
      logger.Info("member", m)
      if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
         if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
            if strings.HasPrefix(m.Name, "DSVAD") {
@@ -206,7 +226,7 @@
         }
      }
   }
   logger.Info("mbs:", mbs, "specmembername:", specmembername)
   logger.Info("members:", mbs, "specmembername:", specmembername)
   if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
      return nil, errors.New("specmembername not found")
   }
@@ -245,8 +265,8 @@
            logger.Info("Query response's len:", len(msg))
            err := json.Unmarshal(msg, &dumpSqls)
            if err == nil {
               logger.Error("dumpSql:", dumpSqls)
               logger.Error("data dump success")
               //logger.Error("dumpSql:", dumpSqls)
               logger.Debug("data dump success")
            }
            return
         }
@@ -267,7 +287,7 @@
      return
   }
   err = Agent.UserEvent(UserEventSyncSql, ueB, false)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      logger.Error("err: ", err)
   if err != nil {
      logger.Error("sending sync sql event err: ", err)
   }
}