基于serf的数据库同步模块库
zhangzengfei
2023-05-15 12f52d5835388f22fdecb2a890d58e5425688c8e
agent.go
@@ -44,6 +44,7 @@
   QueryEventUpdateDBData = "UpdateDBData"
   UserEventSyncSql      = "SyncSql"
   UserEventSyncDbTablePersonCache      = "SyncCache"
   UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改
)
// Agent warps the serf agent
@@ -59,6 +60,7 @@
   NodeID      string `json:"nodeID"`
   NodeAddress string `json:"nodeAddress"`
   IsAlive     int    `json:"isAlive"`
   Role        string `json:"role"`
}
// Create create serf agent with config
@@ -116,6 +118,7 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,512)
var SyncVirtualIpChan = make(chan []byte, 512)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -151,12 +154,15 @@
               if e != nil {
                  logErr = e.Error()
               }
               ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
            }()
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
         SyncDbTablePersonCacheChan <- ev.Payload
      } else if ev.Name == UserEventSyncVirtualIp {
         logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
         SyncVirtualIpChan <- ev.Payload
      }
@@ -263,7 +269,7 @@
               }
               ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
            }()
         } else {
            logger.Debug("targetNode is nil")
@@ -295,7 +301,7 @@
            if e != nil {
               logErr = e.Error()
            }
            ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      } else if event.EventType() == serf.EventMemberJoin {
@@ -316,7 +322,7 @@
            if e != nil {
               logErr = e.Error()
            }
            ExecuteQueryByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      }
@@ -757,6 +763,7 @@
      node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
      node.IsAlive = int(mb.Status)
      node.ClusterID = mb.Tags[tagKeyClusterID]
      node.Role = mb.Tags["role"]
      nodes = append(nodes, node)
   }