From cac6e9c7b13021b8b2bafa882772370ba847d226 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期五, 27 九月 2019 17:44:05 +0800
Subject: [PATCH] add lamport_time to db
---
agent.go | 21 ++++++++++++++++++++-
1 files changed, 20 insertions(+), 1 deletions(-)
diff --git a/agent.go b/agent.go
index 7483a81..1994157 100644
--- a/agent.go
+++ b/agent.go
@@ -84,6 +84,10 @@
logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
len(conf.EncryptKey), conf.EncryptKey)
+ ltLock.Lock()
+ curLTime = QueryLTimeFromDbByGorm()
+ ltLock.Unlock()
+
return &Agent{
Agent: serfAgent,
conf: conf,
@@ -114,6 +118,8 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
+var curLTime uint64
+var ltLock sync.RWMutex
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -123,6 +129,8 @@
switch ev := event.(type) {
case serf.UserEvent:
+ ltLock.Lock()
+ defer ltLock.Unlock()
if ev.Name == UserEventSyncSql {
var sqlUe SqlUserEvent
err := json.Unmarshal(ev.Payload, &sqlUe)
@@ -131,11 +139,22 @@
return
}
if sqlUe.Owner != a.conf.NodeName {
- //results, err := ExecuteWriteSql(sqlArr)
+ evTime := uint64(ev.LTime)
+ logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
+ if curLTime !=0 && evTime < curLTime{//鏄鐞嗚繃鐨勪簨浠�
+ logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
+ return
+ }
flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
+ if flag {
+ curLTime = evTime
+
+ ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
+ }
}
} else if ev.Name == UserEventSyncDbTablePersonCache {
+ logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
SyncDbTablePersonCacheChan <- ev.Payload
}
--
Gitblit v1.8.0