基于serf的数据库同步模块库
liuxiaolong
2019-09-27 cac6e9c7b13021b8b2bafa882772370ba847d226
agent.go
@@ -84,6 +84,10 @@
   logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
      len(conf.EncryptKey), conf.EncryptKey)
   ltLock.Lock()
   curLTime = QueryLTimeFromDbByGorm()
   ltLock.Unlock()
   return &Agent{
      Agent:   serfAgent,
      conf:    conf,
@@ -114,6 +118,8 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
var curLTime uint64
var ltLock sync.RWMutex
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -123,6 +129,8 @@
   switch ev := event.(type) {
   case serf.UserEvent:
      ltLock.Lock()
      defer ltLock.Unlock()
      if ev.Name == UserEventSyncSql {
         var sqlUe SqlUserEvent
         err := json.Unmarshal(ev.Payload, &sqlUe)
@@ -131,11 +139,22 @@
            return
         }
         if sqlUe.Owner != a.conf.NodeName {
            //results, err := ExecuteWriteSql(sqlArr)
            evTime := uint64(ev.LTime)
            logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
            if curLTime !=0 && evTime < curLTime{//是处理过的事件
               logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
               return
            }
            flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
            logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
            if flag {
               curLTime = evTime
               ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
            }
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
         SyncDbTablePersonCacheChan <- ev.Payload
      }
@@ -286,8 +305,8 @@
// Stop serf agent
func (a *Agent) Stop() {
   if a.errorCh != nil {
      a.Leave()
      a.Shutdown()
      logger.Info("a.Shutdown()", a.Leave())
      logger.Info("a.Shutdown()", a.Shutdown())
      close(a.errorCh)
      a.errorCh = nil
   }