基于serf的数据库同步模块库
liuxiaolong
2019-09-28 c02c0ac5e06ee1a556107baa071ef0c80a387a13
rm ltime save and get from sqlite
1个文件已修改
21 ■■■■ 已修改文件
agent.go 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -84,10 +84,6 @@
    logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
        len(conf.EncryptKey), conf.EncryptKey)
    ltLock.Lock()
    curLTime = QueryLTimeFromDbByGorm()
    ltLock.Unlock()
    return &Agent{
        Agent:   serfAgent,
        conf:    conf,
@@ -118,8 +114,6 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
var curLTime uint64
var ltLock sync.RWMutex
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -129,8 +123,6 @@
    switch ev := event.(type) {
    case serf.UserEvent:
        ltLock.Lock()
        defer ltLock.Unlock()
        if ev.Name == UserEventSyncSql {
            var sqlUe SqlUserEvent
            err := json.Unmarshal(ev.Payload, &sqlUe)
@@ -139,19 +131,12 @@
                return
            }
            if sqlUe.Owner != a.conf.NodeName {
                evTime := uint64(ev.LTime)
                logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
                if curLTime !=0 && evTime < curLTime{//是处理过的事件
                    logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
                    return
                }
                logger.Info("ev.LTime:",evTime,",SqlUserEvent.sql:",sqlUe.Sql)
                flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
                logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
                if flag {
                    curLTime = evTime
                    ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
                }
            }
        } else if ev.Name == UserEventSyncDbTablePersonCache {
            logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))