基于serf的数据库同步模块库
liuxiaolong
2019-08-19 c1ed6226183527f60bf7a9f15b780a2c8f67e69a
agent.go
@@ -40,6 +40,8 @@
const (
   QueryEventGetDB        = "GetDatabase"
   QueryEventUpdateDBData = "UpdateDBData"
   UserEventSyncSql      = "SyncSql"
   UserEventSyncDbTablePersonCache      = "SyncCache"
)
// Agent warps the serf agent
@@ -110,6 +112,8 @@
   go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -118,12 +122,22 @@
   switch ev := event.(type) {
   case serf.UserEvent:
      fmt.Println(string(ev.Payload))
      var sqlArr []string
      sqlArr = append(sqlArr, string(ev.Payload))
      //results, err := ExecuteWriteSql(sqlArr)
      flag, _ := ExecuteSqlByGorm(sqlArr)
      fmt.Println("userEvent exec ",sqlArr,",Result:",flag)
      if ev.Name == UserEventSyncSql {
         var sqlUe SqlUserEvent
         err := json.Unmarshal(ev.Payload, &sqlUe)
         if err !=nil {
            fmt.Println("sqlUe unmarshal err:",err)
            return
         }
         if sqlUe.Owner != a.conf.NodeName {
            //results, err := ExecuteWriteSql(sqlArr)
            flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
            fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",flag)
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         SyncDbTablePersonCacheChan <- ev.Payload
      }
   case *serf.Query:
@@ -478,12 +492,34 @@
}
type SqlUserEvent struct {
   Owner string `json:"owner"`
   Sql []string `json:"sql"`
}
//SyncSql boardcast sql to cluster
func (a *Agent) SyncSql(sqlOp string) {
func (a *Agent) SyncSql(sqlOp []string) {
   // event : use to send command to operate db.
   err := a.UserEvent("SyncSql", []byte(sqlOp), false)
   var sqlUe = SqlUserEvent{
      Owner: a.conf.NodeName,
      Sql: sqlOp,
   }
   ueB, err := json.Marshal(sqlUe)
   if err !=nil {
      fmt.Println("sqlUE marshal err:",err)
      return
   }
   err = a.UserEvent(UserEventSyncSql, ueB, false)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      fmt.Println("err: ", err)
   }
}
//更新同步库的比对缓存
func (a *Agent) SyncDbTablePersonCache(b []byte) {
   err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
   if err !=nil{
      fmt.Println("UserEventSyncDbTablePersonCache err:",err)
   }
}
@@ -553,8 +589,8 @@
   n, err := a.Agent.Join(nodes, true)
   if err != nil || n == 0 {
      a.Stop()
      fmt.Println("Stop node")
      //a.Stop()
      //fmt.Println("Stop node")
      return fmt.Errorf("Error Encrypt Key!")
   }