基于serf的数据库同步模块库
zhangzengfei
2023-05-15 12f52d5835388f22fdecb2a890d58e5425688c8e
agent.go
@@ -44,6 +44,7 @@
   QueryEventUpdateDBData = "UpdateDBData"
   UserEventSyncSql      = "SyncSql"
   UserEventSyncDbTablePersonCache      = "SyncCache"
   UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改
)
// Agent warps the serf agent
@@ -59,6 +60,7 @@
   NodeID      string `json:"nodeID"`
   NodeAddress string `json:"nodeAddress"`
   IsAlive     int    `json:"isAlive"`
   Role        string `json:"role"`
}
// Create create serf agent with config
@@ -116,6 +118,7 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,512)
var SyncVirtualIpChan = make(chan []byte, 512)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -142,7 +145,7 @@
               logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
               logLT := strconv.Itoa(int(ev.LTime))
               logT := time.Now().Format("2006-01-02 15:04:05")
               logSql := strings.Join(sqlUe.Sql, ";")
               logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
               logResult := "0"
               if flag {
                  logResult = "1"
@@ -151,12 +154,15 @@
               if e != nil {
                  logErr = e.Error()
               }
               ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
            }()
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
         SyncDbTablePersonCacheChan <- ev.Payload
      } else if ev.Name == UserEventSyncVirtualIp {
         logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
         SyncVirtualIpChan <- ev.Payload
      }
@@ -251,7 +257,7 @@
               logLT := strconv.Itoa(int(ev.LTime))
               logT := time.Now().Format("2006-01-02 15:04:05")
               logSql := "QueryEventUpdateDBData from "+targetNode.Name
               logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name,"'","''")
               logResult := "0"
               logErr := ""
               if sendErr ==nil {
@@ -263,7 +269,7 @@
               }
               ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
            }()
         } else {
            logger.Debug("targetNode is nil")
@@ -286,7 +292,7 @@
            logger.Info("EventMemberLeave,current Members:",ev.Members)
            logLT := ""
            logT := time.Now().Format("2006-01-02 15:04:05")
            logSql := leaveSql
            logSql := strings.ReplaceAll(leaveSql, "'","''")
            logResult := "0"
            if flag {
               logResult = "1"
@@ -295,7 +301,7 @@
            if e != nil {
               logErr = e.Error()
            }
            ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      } else if event.EventType() == serf.EventMemberJoin {
@@ -307,7 +313,7 @@
            logger.Info("EventMemberJoin,current Members:",ev.Members)
            logLT := ""
            logT := time.Now().Format("2006-01-02 15:04:05")
            logSql := joinSql
            logSql := strings.ReplaceAll(joinSql, "'", "''")
            logResult := "0"
            if flag {
               logResult = "1"
@@ -316,7 +322,7 @@
            if e != nil {
               logErr = e.Error()
            }
            ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      }
@@ -757,6 +763,7 @@
      node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
      node.IsAlive = int(mb.Status)
      node.ClusterID = mb.Tags[tagKeyClusterID]
      node.Role = mb.Tags["role"]
      nodes = append(nodes, node)
   }