基于serf的数据库同步模块库
liuxiaolong
2019-09-27 cac6e9c7b13021b8b2bafa882772370ba847d226
add lamport_time to db
2个文件已修改
48 ■■■■■ 已修改文件
agent.go 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -84,6 +84,10 @@
    logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
        len(conf.EncryptKey), conf.EncryptKey)
    ltLock.Lock()
    curLTime = QueryLTimeFromDbByGorm()
    ltLock.Unlock()
    return &Agent{
        Agent:   serfAgent,
        conf:    conf,
@@ -114,6 +118,8 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
var curLTime uint64
var ltLock sync.RWMutex
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -123,6 +129,8 @@
    switch ev := event.(type) {
    case serf.UserEvent:
        ltLock.Lock()
        defer ltLock.Unlock()
        if ev.Name == UserEventSyncSql {
            var sqlUe SqlUserEvent
            err := json.Unmarshal(ev.Payload, &sqlUe)
@@ -131,11 +139,22 @@
                return
            }
            if sqlUe.Owner != a.conf.NodeName {
                //results, err := ExecuteWriteSql(sqlArr)
                evTime := uint64(ev.LTime)
                logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
                if curLTime !=0 && evTime < curLTime{//是处理过的事件
                    logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
                    return
                }
                flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
                logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
                if flag {
                    curLTime = evTime
                    ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
                }
            }
        } else if ev.Name == UserEventSyncDbTablePersonCache {
            logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
            SyncDbTablePersonCacheChan <- ev.Payload
        }
dbself.go
@@ -6,6 +6,7 @@
    "os"
    "os/exec"
    "path/filepath"
    "strconv"
    "strings"
    "sync"
    "github.com/jinzhu/gorm"
@@ -132,6 +133,32 @@
    return false,errors.New("localDb is nil")
}
type SyncSerf struct {
    LamportTime string `json:"lamport_time"`
}
func QueryLTimeFromDbByGorm() uint64 {
    if localDb != nil {
        var syncSerf []SyncSerf
        err := localDb.Raw("select * from sync_serf").Scan(&syncSerf).Error
        if err == nil && len(syncSerf) > 0 {
            ltStr := syncSerf[0].LamportTime
            logger.Info("db.LamportTime str:", ltStr)
            t, e := strconv.ParseUint(ltStr, 10, 64)
            if e != nil {
                logger.Error("db.LamportTime parseUint err:", e)
            } else {
                curLTime = t
            }
            logger.Info("db.LamportTime:", ltStr)
        } else {
            logger.Error("get db.LamportTime err:", err)
        }
    }
    return 0
}
type TableDesc struct {
    Cid int `json:"cid"`
    Name string `json:"name"`