zhangzengfei
2023-10-08 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f
system-service/serf/serf.go
@@ -23,11 +23,15 @@
   UserEventSyncVirtualIp          = "SyncVirtualIp"              //漂移ip修改
   UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //同步注册信息
   DataSystemSerfSubscribe         = "data-system-serf-subscribe" //各app从serf订阅消息
   UserEventSyncMessage            = "SyncMessageForProc"         // 为其他进程同步消息
   TcpTransportPort                = 30194                        //tcp传输大数据量接口
   SUserEventSyncMessage
)
var SyncDbTablePersonCacheChan = make(chan []byte, 512)
var SyncVirtualIpChan = make(chan []byte, 512)
var SyncProcMessageChan = make(chan []byte, 512)
func HandleSerfEvent(event serf.Event) {
   switch ev := event.(type) {
@@ -42,6 +46,9 @@
         HandleSyncRegisterInfo(ev)
      } else if ev.Name == DataSystemSerfSubscribe {
         HandleDataSystemSerfSub(ev)
      } else if ev.Name == UserEventSyncMessage {
         logger.Debug("接收到SyncMessageForProc")
         HandleUserEventSyncMessage(ev)
      }
   case *serf.Query:
      if ev.Name == QueryEventUpdateDBData {
@@ -100,6 +107,14 @@
type SqlUserEvent struct {
   Owner string   `json:"owner"`
   Sql   []string `json:"sql"`
}
type ProcMessageEvent struct {
   Owner   string `json:"owner"`    // 发送者
   Target  string `json:"target"`   // 指定接收者
   Proc    string `json:"procName"` // 进程名
   Topic   string `json:"topic"`    // 主题
   Payload []byte `json:"payload"`  // 消息体,自行解析
}
type TableDesc struct {
@@ -193,7 +208,7 @@
   mbs := a.GroupMembers(clusterId)
   var specmembername string
   for _, m := range mbs {
      logger.Info("m", m)
      logger.Info("member", m)
      if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
         if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
            if strings.HasPrefix(m.Name, "DSVAD") {
@@ -206,7 +221,7 @@
         }
      }
   }
   logger.Info("mbs:", mbs, "specmembername:", specmembername)
   logger.Info("members:", mbs, "specmembername:", specmembername)
   if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
      return nil, errors.New("specmembername not found")
   }
@@ -245,8 +260,8 @@
            logger.Info("Query response's len:", len(msg))
            err := json.Unmarshal(msg, &dumpSqls)
            if err == nil {
               logger.Error("dumpSql:", dumpSqls)
               logger.Error("data dump success")
               //logger.Error("dumpSql:", dumpSqls)
               logger.Debug("data dump success")
            }
            return
         }