基于serf的数据库同步模块库
zhangzengfei
2023-05-15 12f52d5835388f22fdecb2a890d58e5425688c8e
agent.go
@@ -36,6 +36,7 @@
   "basic.com/valib/serf.git/cmd/serf/command/agent"
   //"github.com/apache/servicecomb-service-center/pkg/log"
   "basic.com/valib/logger.git"
   "github.com/satori/go.uuid"
)
const (
@@ -43,6 +44,7 @@
   QueryEventUpdateDBData = "UpdateDBData"
   UserEventSyncSql      = "SyncSql"
   UserEventSyncDbTablePersonCache      = "SyncCache"
   UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改
)
// Agent warps the serf agent
@@ -58,6 +60,7 @@
   NodeID      string `json:"nodeID"`
   NodeAddress string `json:"nodeAddress"`
   IsAlive     int    `json:"isAlive"`
   Role        string `json:"role"`
}
// Create create serf agent with config
@@ -115,6 +118,7 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,512)
var SyncVirtualIpChan = make(chan []byte, 512)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -139,11 +143,26 @@
            go func() {
               flag, e := ExecuteSqlByGorm(sqlUe.Sql)
               logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
               logLT := strconv.Itoa(int(ev.LTime))
               logT := time.Now().Format("2006-01-02 15:04:05")
               logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
               logResult := "0"
               if flag {
                  logResult = "1"
               }
               logErr := ""
               if e != nil {
                  logErr = e.Error()
               }
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
            }()
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
         SyncDbTablePersonCacheChan <- ev.Payload
      } else if ev.Name == UserEventSyncVirtualIp {
         logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
         SyncVirtualIpChan <- ev.Payload
      }
@@ -236,11 +255,21 @@
               addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
               sendErr := rawSendTcpMsg(addr, bytesReturn)
               if sendErr != nil {
                  logger.Debug("sendToTcp err:",sendErr)
               } else {
               logLT := strconv.Itoa(int(ev.LTime))
               logT := time.Now().Format("2006-01-02 15:04:05")
               logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name,"'","''")
               logResult := "0"
               logErr := ""
               if sendErr ==nil {
                  logResult = "1"
                  logger.Debug("sendToTcp success")
               } else {
                  logErr = sendErr.Error()
                  logger.Debug("sendToTcp err:",sendErr)
               }
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
            }()
         } else {
            logger.Debug("targetNode is nil")
@@ -258,18 +287,42 @@
         if ev.Members !=nil && len(ev.Members) ==1 {
            leaveMember := ev.Members[0]
            leaveSql := "update cluster_node set isDelete=1 where node_id='"+leaveMember.Name+"'"
            ExecuteSqlByGorm([]string{ leaveSql })
            flag,e := ExecuteSqlByGorm([]string{ leaveSql })
            logger.Info("EventMemberLeave,current Members:",ev.Members)
            logLT := ""
            logT := time.Now().Format("2006-01-02 15:04:05")
            logSql := strings.ReplaceAll(leaveSql, "'","''")
            logResult := "0"
            if flag {
               logResult = "1"
            }
            logErr := ""
            if e != nil {
               logErr = e.Error()
            }
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      } else if event.EventType() == serf.EventMemberJoin {
         if ev.Members !=nil && len(ev.Members) ==1 {
            leaveMember := ev.Members[0]
            leaveSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'"
            ExecuteSqlByGorm([]string{ leaveSql })
            joinSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'"
            flag,e := ExecuteSqlByGorm([]string{joinSql})
            logger.Info("EventMemberJoin,current Members:",ev.Members)
            logLT := ""
            logT := time.Now().Format("2006-01-02 15:04:05")
            logSql := strings.ReplaceAll(joinSql, "'", "''")
            logResult := "0"
            if flag {
               logResult = "1"
            }
            logErr := ""
            if e != nil {
               logErr = e.Error()
            }
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      }
@@ -710,6 +763,7 @@
      node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
      node.IsAlive = int(mb.Status)
      node.ClusterID = mb.Tags[tagKeyClusterID]
      node.Role = mb.Tags["role"]
      nodes = append(nodes, node)
   }