基于serf的数据库同步模块库
liuxiaolong
2020-10-19 0913731a5f381be02bfafbabddd32446361dbecd
agent.go
@@ -32,10 +32,11 @@
   "strings"
   "time"
   "basic.com/valib/serf.git"
   "basic.com/valib/serf.git/serf"
   "basic.com/valib/serf.git/cmd/serf/command/agent"
   //"github.com/apache/servicecomb-service-center/pkg/log"
   "basic.com/valib/logger.git"
   "github.com/satori/go.uuid"
)
const (
@@ -114,7 +115,7 @@
   go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
var SyncDbTablePersonCacheChan = make(chan []byte,512)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -139,6 +140,18 @@
            go func() {
               flag, e := ExecuteSqlByGorm(sqlUe.Sql)
               logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
               logLT := strconv.Itoa(int(ev.LTime))
               logT := time.Now().Format("2006-01-02 15:04:05")
               logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
               logResult := "0"
               if flag {
                  logResult = "1"
               }
               logErr := ""
               if e != nil {
                  logErr = e.Error()
               }
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
            }()
         }
      } else if ev.Name == UserEventSyncDbTablePersonCache {
@@ -236,11 +249,21 @@
               addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
               sendErr := rawSendTcpMsg(addr, bytesReturn)
               if sendErr != nil {
                  logger.Debug("sendToTcp err:",sendErr)
               } else {
               logLT := strconv.Itoa(int(ev.LTime))
               logT := time.Now().Format("2006-01-02 15:04:05")
               logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name,"'","''")
               logResult := "0"
               logErr := ""
               if sendErr ==nil {
                  logResult = "1"
                  logger.Debug("sendToTcp success")
               } else {
                  logErr = sendErr.Error()
                  logger.Debug("sendToTcp err:",sendErr)
               }
               ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
            }()
         } else {
            logger.Debug("targetNode is nil")
@@ -258,18 +281,42 @@
         if ev.Members !=nil && len(ev.Members) ==1 {
            leaveMember := ev.Members[0]
            leaveSql := "update cluster_node set isDelete=1 where node_id='"+leaveMember.Name+"'"
            ExecuteSqlByGorm([]string{ leaveSql })
            flag,e := ExecuteSqlByGorm([]string{ leaveSql })
            logger.Info("EventMemberLeave,current Members:",ev.Members)
            logLT := ""
            logT := time.Now().Format("2006-01-02 15:04:05")
            logSql := strings.ReplaceAll(leaveSql, "'","''")
            logResult := "0"
            if flag {
               logResult = "1"
            }
            logErr := ""
            if e != nil {
               logErr = e.Error()
            }
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      } else if event.EventType() == serf.EventMemberJoin {
         if ev.Members !=nil && len(ev.Members) ==1 {
            leaveMember := ev.Members[0]
            leaveSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'"
            ExecuteSqlByGorm([]string{ leaveSql })
            joinSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'"
            flag,e := ExecuteSqlByGorm([]string{joinSql})
            logger.Info("EventMemberJoin,current Members:",ev.Members)
            logLT := ""
            logT := time.Now().Format("2006-01-02 15:04:05")
            logSql := strings.ReplaceAll(joinSql, "'", "''")
            logResult := "0"
            if flag {
               logResult = "1"
            }
            logErr := ""
            if e != nil {
               logErr = e.Error()
            }
            ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
         }
         return
      }