From cac6e9c7b13021b8b2bafa882772370ba847d226 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期五, 27 九月 2019 17:44:05 +0800
Subject: [PATCH] add lamport_time to db

---
 agent.go  |   21 ++++++++++++++++++++-
 dbself.go |   27 +++++++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletions(-)

diff --git a/agent.go b/agent.go
index 7483a81..1994157 100644
--- a/agent.go
+++ b/agent.go
@@ -84,6 +84,10 @@
 	logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
 		len(conf.EncryptKey), conf.EncryptKey)
 
+	ltLock.Lock()
+	curLTime = QueryLTimeFromDbByGorm()
+	ltLock.Unlock()
+
 	return &Agent{
 		Agent:   serfAgent,
 		conf:    conf,
@@ -114,6 +118,8 @@
 }
 
 var SyncDbTablePersonCacheChan = make(chan []byte,0)
+var curLTime uint64
+var ltLock sync.RWMutex
 
 // HandleEvent Handles serf.EventMemberJoin events,
 // which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -123,6 +129,8 @@
 
 	switch ev := event.(type) {
 	case serf.UserEvent:
+		ltLock.Lock()
+		defer ltLock.Unlock()
 		if ev.Name == UserEventSyncSql {
 			var sqlUe SqlUserEvent
 			err := json.Unmarshal(ev.Payload, &sqlUe)
@@ -131,11 +139,22 @@
 				return
 			}
 			if sqlUe.Owner != a.conf.NodeName {
-				//results, err := ExecuteWriteSql(sqlArr)
+				evTime := uint64(ev.LTime)
+				logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
+				if curLTime !=0 && evTime < curLTime{//鏄鐞嗚繃鐨勪簨浠�
+					logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
+					return
+				}
 				flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
 				logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
+				if flag {
+					curLTime = evTime
+
+					ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
+				}
 			}
 		} else if ev.Name == UserEventSyncDbTablePersonCache {
+			logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
 			SyncDbTablePersonCacheChan <- ev.Payload
 		}
 
diff --git a/dbself.go b/dbself.go
index 884b867..a3a9140 100644
--- a/dbself.go
+++ b/dbself.go
@@ -6,6 +6,7 @@
 	"os"
 	"os/exec"
 	"path/filepath"
+	"strconv"
 	"strings"
 	"sync"
 	"github.com/jinzhu/gorm"
@@ -132,6 +133,32 @@
 	return false,errors.New("localDb is nil")
 }
 
+type SyncSerf struct {
+	LamportTime string `json:"lamport_time"`
+}
+
+func QueryLTimeFromDbByGorm() uint64 {
+	if localDb != nil {
+		var syncSerf []SyncSerf
+		err := localDb.Raw("select * from sync_serf").Scan(&syncSerf).Error
+		if err == nil && len(syncSerf) > 0 {
+			ltStr := syncSerf[0].LamportTime
+			logger.Info("db.LamportTime str:", ltStr)
+			t, e := strconv.ParseUint(ltStr, 10, 64)
+			if e != nil {
+				logger.Error("db.LamportTime parseUint err:", e)
+			} else {
+				curLTime = t
+			}
+			logger.Info("db.LamportTime:", ltStr)
+
+		} else {
+			logger.Error("get db.LamportTime err:", err)
+		}
+	}
+	return 0
+}
+
 type TableDesc struct {
 	Cid int `json:"cid"`
 	Name string `json:"name"`

--
Gitblit v1.8.0