| | |
| | | logger.Info("[INFO] agent: Restored keyring with %d keys from %s", |
| | | len(conf.EncryptKey), conf.EncryptKey) |
| | | |
| | | ltLock.Lock() |
| | | curLTime = QueryLTimeFromDbByGorm() |
| | | ltLock.Unlock() |
| | | |
| | | return &Agent{ |
| | | Agent: serfAgent, |
| | | conf: conf, |
| | |
| | | } |
| | | |
| | | var SyncDbTablePersonCacheChan = make(chan []byte,0) |
| | | var curLTime uint64 |
| | | var ltLock sync.RWMutex |
| | | |
| | | // HandleEvent Handles serf.EventMemberJoin events, |
| | | // which will wait for members to join until the number of group members is equal to "groupExpect" |
| | |
| | | |
| | | switch ev := event.(type) { |
| | | case serf.UserEvent: |
| | | ltLock.Lock() |
| | | defer ltLock.Unlock() |
| | | if ev.Name == UserEventSyncSql { |
| | | var sqlUe SqlUserEvent |
| | | err := json.Unmarshal(ev.Payload, &sqlUe) |
| | |
| | | return |
| | | } |
| | | if sqlUe.Owner != a.conf.NodeName { |
| | | //results, err := ExecuteWriteSql(sqlArr) |
| | | evTime := uint64(ev.LTime) |
| | | logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql) |
| | | if curLTime !=0 && evTime < curLTime{//是处理过的事件 |
| | | logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql) |
| | | return |
| | | } |
| | | flag, _ := ExecuteSqlByGorm(sqlUe.Sql) |
| | | logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag) |
| | | if flag { |
| | | curLTime = evTime |
| | | |
| | | ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"}) |
| | | } |
| | | } |
| | | } else if ev.Name == UserEventSyncDbTablePersonCache { |
| | | logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload)) |
| | | SyncDbTablePersonCacheChan <- ev.Payload |
| | | } |
| | | |
| | |
| | | "os" |
| | | "os/exec" |
| | | "path/filepath" |
| | | "strconv" |
| | | "strings" |
| | | "sync" |
| | | "github.com/jinzhu/gorm" |
| | |
| | | return false,errors.New("localDb is nil") |
| | | } |
| | | |
| | | type SyncSerf struct { |
| | | LamportTime string `json:"lamport_time"` |
| | | } |
| | | |
| | | func QueryLTimeFromDbByGorm() uint64 { |
| | | if localDb != nil { |
| | | var syncSerf []SyncSerf |
| | | err := localDb.Raw("select * from sync_serf").Scan(&syncSerf).Error |
| | | if err == nil && len(syncSerf) > 0 { |
| | | ltStr := syncSerf[0].LamportTime |
| | | logger.Info("db.LamportTime str:", ltStr) |
| | | t, e := strconv.ParseUint(ltStr, 10, 64) |
| | | if e != nil { |
| | | logger.Error("db.LamportTime parseUint err:", e) |
| | | } else { |
| | | curLTime = t |
| | | } |
| | | logger.Info("db.LamportTime:", ltStr) |
| | | |
| | | } else { |
| | | logger.Error("get db.LamportTime err:", err) |
| | | } |
| | | } |
| | | return 0 |
| | | } |
| | | |
| | | type TableDesc struct { |
| | | Cid int `json:"cid"` |
| | | Name string `json:"name"` |