基于serf的数据库同步模块库
liuxiaolong
2020-09-16 4f3a5a019ca1e5a3c81acdd4124253e7a86bc6d0
agent.go
@@ -32,8 +32,8 @@
   "strings"
   "time"
   "github.com/hashicorp/serf/cmd/serf/command/agent"
   "github.com/hashicorp/serf/serf"
   "basic.com/valib/serf.git/serf"
   "basic.com/valib/serf.git/cmd/serf/command/agent"
   //"github.com/apache/servicecomb-service-center/pkg/log"
   "basic.com/valib/logger.git"
)
@@ -70,7 +70,7 @@
   // create serf agent with serf config
   logger.Info("conf.Config.EncryptKey:", conf.EncryptKey)
   serfAgent, err := agent.Create(conf.Config, serfConf, nil)
   serfAgent, err := agent.Create(conf.Config, serfConf, logger.GetLogFile())
   if err != nil {
      return nil, err
   }
@@ -114,7 +114,7 @@
   go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
var SyncDbTablePersonCacheChan = make(chan []byte,512)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -125,18 +125,21 @@
   switch ev := event.(type) {
   case serf.UserEvent:
      if ev.Name == UserEventSyncSql {
         logger.Info("receive a UserEventSyncSql event")
         var sqlUe SqlUserEvent
         err := json.Unmarshal(ev.Payload, &sqlUe)
         if err !=nil {
            logger.Error("sqlUe unmarshal err:",err)
            return
         }
         logger.Info("ev.LTime:", ev.LTime ,"owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
         if sqlUe.Owner != a.conf.NodeName {
            flag, e := ExecuteSqlByGorm(sqlUe.Sql)
            evTime := uint64(ev.LTime)
            logger.Info("ev.LTime:",evTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
            go func() {
               flag, e := ExecuteSqlByGorm(sqlUe.Sql)
               logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
            }()
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
@@ -189,16 +192,29 @@
         //for _, r := range rows {
         //   rowsReturn = append(rowsReturn, *r)
         //}
         logger.Info("receive QueryEventUpdateDBData, current node:", a.conf.NodeName)
         var fromP QueryTableDataParam
         err := json.Unmarshal(ev.Payload, &fromP)
         if err !=nil {
            logger.Error("Query tableNames unmarshal err")
            if query, ok := event.(*serf.Query); ok {
               if err := query.Respond([]byte("request unmarshal err")); err != nil {
                  logger.Error("query.Respond err: %s\n", err)
                  return
               }
            }
            return
         }
         logger.Info("Query tableNames:",fromP.Tables)
         datas, err := ExecuteQueryByGorm(fromP.Tables)
         if err !=nil {
            logger.Error("queryByGorm err")
            logger.Error("queryByGorm err:", err)
            if query, ok := event.(*serf.Query); ok {
               if err := query.Respond([]byte("queryByGorm err")); err != nil {
                  logger.Error("query.Respond err: %s\n", err)
                  return
               }
            }
            return
         }
         bytesReturn, err := json.Marshal(datas)
@@ -216,15 +232,16 @@
         }
         logger.Debug("targetNode:",targetNode.Name)
         if targetNode !=nil {
            addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
            sendErr := rawSendTcpMsg(addr, bytesReturn)
            go func() {
               addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
               sendErr := rawSendTcpMsg(addr, bytesReturn)
            if sendErr != nil {
               logger.Debug("sendToTcp err:",sendErr)
            } else {
               logger.Debug("sendToTcp success")
            }
               if sendErr != nil {
                  logger.Debug("sendToTcp err:",sendErr)
               } else {
                  logger.Debug("sendToTcp success")
               }
            }()
         } else {
            logger.Debug("targetNode is nil")
         }
@@ -486,7 +503,7 @@
var QueryTcpResponseChan = make(chan []byte)
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string,error) {
func (a *Agent) GetTableDataFromCluster(tableNames []string, timeout time.Duration) (*[]string,error) {
   //members: get name of first member
   mbs := a.GroupMembers(a.conf.ClusterID)
   var specmembername string
@@ -531,7 +548,7 @@
   var wg sync.WaitGroup
   wg.Add(1)
   ticker := time.NewTicker(300*time.Second)
   ticker := time.NewTicker(timeout)
   go func(tk *time.Ticker) {
      defer tk.Stop()
      defer wg.Done()