zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/serf/handler.go
@@ -1,264 +1,264 @@
package serf
import (
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "github.com/golang/protobuf/proto"
   "github.com/hashicorp/memberlist"
   "github.com/satori/go.uuid"
   "path/filepath"
   "reflect"
   "runtime"
   "strconv"
   "strings"
   "time"
   "vamicro/config"
   "vamicro/system-service/bhome_msg_dev"
)
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
var rpcHandlers map[string]RpcHandle
func init() {
   rpcHandlers = make(map[string]RpcHandle)
}
// RegisterRpcHandles
func RegisterRpcHandles(fs ...RpcHandle) {
   for _, f := range fs {
      name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName
      nameEnd := filepath.Ext(name)
      name = strings.TrimPrefix(nameEnd, ".")
      if _, ok := rpcHandlers[name]; !ok {
         rpcHandlers[name] = f
      }
   }
}
/*****************************UserEvent***************************************/
func HandleUserEventSyncSql(ev serf.UserEvent) {
   logger.Info("receive a UserEventSyncSql event")
   var sqlUe SqlUserEvent
   err := json.Unmarshal(ev.Payload, &sqlUe)
   if err != nil {
      logger.Error("sqlUe unmarshal err:", err)
      return
   }
   logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
   if sqlUe.Owner != config.Server.AnalyServerId {
      go func() {
         flag, e := executeSqlByGorm(sqlUe.Sql)
         logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e)
         logLT := strconv.Itoa(int(ev.LTime))
         logT := time.Now().Format("2006-01-02 15:04:05")
         logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
         logResult := "0"
         if flag {
            logResult = "1"
         }
         logErr := ""
         if e != nil {
            logErr = e.Error()
         }
         executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"})
      }()
   }
}
func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) {
   logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload))
   SyncDbTablePersonCacheChan <- ev.Payload
}
func HandleUserEventSyncVirtualIp(ev serf.UserEvent) {
   logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
   SyncVirtualIpChan <- ev.Payload
}
//收到其它节点主动将注册中心的所有topic通知到集群中
func HandleSyncRegisterInfo(ev serf.UserEvent) {
   logger.Debug("HandleSyncRegisterInfo")
   var si bhome_msg_dev.MsgDevRegisterInfo
   if err := proto.Unmarshal(ev.Payload, &si); err == nil {
      logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId)
      if string(si.DevId) != config.Server.AnalyServerId {
         compareRPool(&si)
      }
   } else {
      logger.Error("HandleSyncRegisterInfo unmarshal err:", err)
   }
}
func HandleDataSystemSerfSub(ev serf.UserEvent) {
   h := GetBusHandle()
   if h == nil {
      logger.Error("HandleDataSystemSerfSub bus handle is nil")
      return
   }
   err := h.Publish(DataSystemSerfSubscribe, ev.Payload)
   if err != nil {
      logger.Error("HandleDataSystemSerfSub pub err:", err)
   }
}
/*****************************Query***************************************/
func HandleQueryEventUpdateDBData(ev *serf.Query) {
   logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId)
   var fromP QueryTableDataParam
   err := json.Unmarshal(ev.Payload, &fromP)
   if err != nil {
      logger.Error("Query tableNames unmarshal err")
      if err := ev.Respond([]byte("request unmarshal err")); err != nil {
         logger.Error("query.Respond err: %s\n", err)
         return
      }
      return
   }
   logger.Info("Query tableNames:", fromP.Tables)
   datas, err := DumpTables(fromP.Tables)
   if err != nil {
      logger.Error("queryByGorm err:", err)
      if err := ev.Respond([]byte("queryByGorm err")); err != nil {
         logger.Error("query.Respond err: %s\n", err)
         return
      }
      return
   }
   bytesReturn, err := json.Marshal(datas)
   logger.Info("results.len: ", len(bytesReturn))
   var targetNode *memberlist.Node
   nodes := Agent.Serf().Memberlist().Members()
   if nodes != nil && len(nodes) > 0 {
      for _, n := range nodes {
         if n.Name == fromP.From {
            targetNode = n
            break
         }
      }
   }
   logger.Debug("targetNode:", targetNode.Name)
   if targetNode != nil {
      go func() {
         addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
         sendErr := rawSendTcpMsg(addr, bytesReturn)
         logLT := strconv.Itoa(int(ev.LTime))
         logT := time.Now().Format("2006-01-02 15:04:05")
         logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''")
         logResult := "0"
         logErr := ""
         if sendErr == nil {
            logResult = "1"
            logger.Debug("sendToTcp success")
         } else {
            logErr = sendErr.Error()
            logger.Debug("sendToTcp err:", sendErr)
         }
         executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"})
      }()
   } else {
      logger.Debug("targetNode is nil")
   }
}
//处理其他的一些query请求
func HandleOtherQuery(ev *serf.Query) {
   var reqBody RequestSerfTopicMsg
   var ret []byte
   if err := json.Unmarshal(ev.Payload, &reqBody); err != nil {
      ret = []byte(err.Error())
   } else {
      if err, data := QueryLocalProc(reqBody); err != nil {
         ret = []byte(err.Error())
      } else {
         b, e := json.Marshal(data)
         if e != nil {
            ret = []byte(e.Error())
         } else {
            ret = b
         }
      }
   }
   if err := ev.Respond(ret); err != nil {
      logger.Debug("HandleOtherQuery err:", err)
      return
   }
}
func HandleQueryRpc(ev *serf.Query) {
   var ret []byte
   var arg RpcParamTopic
   err := json.Unmarshal(ev.Payload, &arg)
   if err == nil {
      if f, ok := rpcHandlers[arg.Topic]; ok {
         resp, e := f(arg)
         if e == nil {
            if data, me := json.Marshal(resp); me == nil {
               ret = data
            } else {
               logger.Debug("marshal resp err:", e)
            }
         } else {
            logger.Debug("call f err:", e)
         }
      } else {
         logger.Debug("rpcHandlers not contains topic:", arg.Topic)
      }
   } else {
      logger.Debug("unmarshal RpcParamTopic err:", err)
   }
   if rErr := ev.Respond(ret); rErr != nil {
      logger.Debug("HandleQueryRpc err:", rErr)
   }
}
func HandleEventMemberLeave(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{leaveSql})
      logger.Info("EventMemberLeave,current Members:", ev.Members)
      logLT := ""
      logT := time.Now().Format("2006-01-02 15:04:05")
      logSql := strings.ReplaceAll(leaveSql, "'", "''")
      logResult := "0"
      if flag {
         logResult = "1"
      }
      logErr := ""
      if e != nil {
         logErr = e.Error()
      }
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}
func HandleEventMemberJoin(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{joinSql})
      logger.Info("EventMemberJoin,current Members:", ev.Members)
      logLT := ""
      logT := time.Now().Format("2006-01-02 15:04:05")
      logSql := strings.ReplaceAll(joinSql, "'", "''")
      logResult := "0"
      if flag {
         logResult = "1"
      }
      logErr := ""
      if e != nil {
         logErr = e.Error()
      }
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}
package serf
import (
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "github.com/golang/protobuf/proto"
   "github.com/hashicorp/memberlist"
   "github.com/satori/go.uuid"
   "path/filepath"
   "reflect"
   "runtime"
   "strconv"
   "strings"
   "time"
   "vamicro/config"
   "vamicro/system-service/bhome_msg_dev"
)
type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
var rpcHandlers map[string]RpcHandle
func init() {
   rpcHandlers = make(map[string]RpcHandle)
}
// RegisterRpcHandles
func RegisterRpcHandles(fs ...RpcHandle) {
   for _, f := range fs {
      name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName
      nameEnd := filepath.Ext(name)
      name = strings.TrimPrefix(nameEnd, ".")
      if _, ok := rpcHandlers[name]; !ok {
         rpcHandlers[name] = f
      }
   }
}
/*****************************UserEvent***************************************/
func HandleUserEventSyncSql(ev serf.UserEvent) {
   logger.Info("receive a UserEventSyncSql event")
   var sqlUe SqlUserEvent
   err := json.Unmarshal(ev.Payload, &sqlUe)
   if err != nil {
      logger.Error("sqlUe unmarshal err:", err)
      return
   }
   logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
   if sqlUe.Owner != config.Server.AnalyServerId {
      go func() {
         flag, e := executeSqlByGorm(sqlUe.Sql)
         logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e)
         logLT := strconv.Itoa(int(ev.LTime))
         logT := time.Now().Format("2006-01-02 15:04:05")
         logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
         logResult := "0"
         if flag {
            logResult = "1"
         }
         logErr := ""
         if e != nil {
            logErr = e.Error()
         }
         executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"})
      }()
   }
}
func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) {
   logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload))
   SyncDbTablePersonCacheChan <- ev.Payload
}
func HandleUserEventSyncVirtualIp(ev serf.UserEvent) {
   logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
   SyncVirtualIpChan <- ev.Payload
}
//收到其它节点主动将注册中心的所有topic通知到集群中
func HandleSyncRegisterInfo(ev serf.UserEvent) {
   logger.Debug("HandleSyncRegisterInfo")
   var si bhome_msg_dev.MsgDevRegisterInfo
   if err := proto.Unmarshal(ev.Payload, &si); err == nil {
      logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId)
      if string(si.DevId) != config.Server.AnalyServerId {
         compareRPool(&si)
      }
   } else {
      logger.Error("HandleSyncRegisterInfo unmarshal err:", err)
   }
}
func HandleDataSystemSerfSub(ev serf.UserEvent) {
   h := GetBusHandle()
   if h == nil {
      logger.Error("HandleDataSystemSerfSub bus handle is nil")
      return
   }
   err := h.Publish(DataSystemSerfSubscribe, ev.Payload)
   if err != nil {
      logger.Error("HandleDataSystemSerfSub pub err:", err)
   }
}
/*****************************Query***************************************/
func HandleQueryEventUpdateDBData(ev *serf.Query) {
   logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId)
   var fromP QueryTableDataParam
   err := json.Unmarshal(ev.Payload, &fromP)
   if err != nil {
      logger.Error("Query tableNames unmarshal err")
      if err := ev.Respond([]byte("request unmarshal err")); err != nil {
         logger.Error("query.Respond err: %s\n", err)
         return
      }
      return
   }
   logger.Info("Query tableNames:", fromP.Tables)
   datas, err := DumpTables(fromP.Tables)
   if err != nil {
      logger.Error("queryByGorm err:", err)
      if err := ev.Respond([]byte("queryByGorm err")); err != nil {
         logger.Error("query.Respond err: %s\n", err)
         return
      }
      return
   }
   bytesReturn, err := json.Marshal(datas)
   logger.Info("results.len: ", len(bytesReturn))
   var targetNode *memberlist.Node
   nodes := Agent.Serf().Memberlist().Members()
   if nodes != nil && len(nodes) > 0 {
      for _, n := range nodes {
         if n.Name == fromP.From {
            targetNode = n
            break
         }
      }
   }
   logger.Debug("targetNode:", targetNode.Name)
   if targetNode != nil {
      go func() {
         addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
         sendErr := rawSendTcpMsg(addr, bytesReturn)
         logLT := strconv.Itoa(int(ev.LTime))
         logT := time.Now().Format("2006-01-02 15:04:05")
         logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''")
         logResult := "0"
         logErr := ""
         if sendErr == nil {
            logResult = "1"
            logger.Debug("sendToTcp success")
         } else {
            logErr = sendErr.Error()
            logger.Debug("sendToTcp err:", sendErr)
         }
         executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"})
      }()
   } else {
      logger.Debug("targetNode is nil")
   }
}
//处理其他的一些query请求
func HandleOtherQuery(ev *serf.Query) {
   var reqBody RequestSerfTopicMsg
   var ret []byte
   if err := json.Unmarshal(ev.Payload, &reqBody); err != nil {
      ret = []byte(err.Error())
   } else {
      if err, data := QueryLocalProc(reqBody); err != nil {
         ret = []byte(err.Error())
      } else {
         b, e := json.Marshal(data)
         if e != nil {
            ret = []byte(e.Error())
         } else {
            ret = b
         }
      }
   }
   if err := ev.Respond(ret); err != nil {
      logger.Debug("HandleOtherQuery err:", err)
      return
   }
}
func HandleQueryRpc(ev *serf.Query) {
   var ret []byte
   var arg RpcParamTopic
   err := json.Unmarshal(ev.Payload, &arg)
   if err == nil {
      if f, ok := rpcHandlers[arg.Topic]; ok {
         resp, e := f(arg)
         if e == nil {
            if data, me := json.Marshal(resp); me == nil {
               ret = data
            } else {
               logger.Debug("marshal resp err:", e)
            }
         } else {
            logger.Debug("call f err:", e)
         }
      } else {
         logger.Debug("rpcHandlers not contains topic:", arg.Topic)
      }
   } else {
      logger.Debug("unmarshal RpcParamTopic err:", err)
   }
   if rErr := ev.Respond(ret); rErr != nil {
      logger.Debug("HandleQueryRpc err:", rErr)
   }
}
func HandleEventMemberLeave(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{leaveSql})
      logger.Info("EventMemberLeave,current Members:", ev.Members)
      logLT := ""
      logT := time.Now().Format("2006-01-02 15:04:05")
      logSql := strings.ReplaceAll(leaveSql, "'", "''")
      logResult := "0"
      if flag {
         logResult = "1"
      }
      logErr := ""
      if e != nil {
         logErr = e.Error()
      }
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}
func HandleEventMemberJoin(ev serf.MemberEvent) {
   if ev.Members != nil && len(ev.Members) == 1 {
      leaveMember := ev.Members[0]
      joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
      flag, e := executeSqlByGorm([]string{joinSql})
      logger.Info("EventMemberJoin,current Members:", ev.Members)
      logLT := ""
      logT := time.Now().Format("2006-01-02 15:04:05")
      logSql := strings.ReplaceAll(joinSql, "'", "''")
      logResult := "0"
      if flag {
         logResult = "1"
      }
      logErr := ""
      if e != nil {
         logErr = e.Error()
      }
      executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
   }
}