基于serf的数据库同步模块库
chenshijun
2019-09-28 790130e3b01a9daa0ba10fe1510410bf4139347a
agent.go
@@ -84,10 +84,6 @@
   logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
      len(conf.EncryptKey), conf.EncryptKey)
   ltLock.Lock()
   curLTime = QueryLTimeFromDbByGorm()
   ltLock.Unlock()
   return &Agent{
      Agent:   serfAgent,
      conf:    conf,
@@ -118,8 +114,6 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
var curLTime uint64
var ltLock sync.RWMutex
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -129,8 +123,6 @@
   switch ev := event.(type) {
   case serf.UserEvent:
      ltLock.Lock()
      defer ltLock.Unlock()
      if ev.Name == UserEventSyncSql {
         var sqlUe SqlUserEvent
         err := json.Unmarshal(ev.Payload, &sqlUe)
@@ -139,19 +131,12 @@
            return
         }
         if sqlUe.Owner != a.conf.NodeName {
            evTime := uint64(ev.LTime)
            logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
            if curLTime !=0 && evTime < curLTime{//是处理过的事件
               logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
               return
            }
            logger.Info("ev.LTime:",evTime,",SqlUserEvent.sql:",sqlUe.Sql)
            flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
            logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
            if flag {
               curLTime = evTime
               ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
            }
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))