zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/serf/serf.go
@@ -1,273 +1,273 @@
package serf
import (
   "basic.com/syncdb.git"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "errors"
   "fmt"
   "strings"
   "sync"
   "time"
   "vamicro/config"
   "vamicro/system-service/models"
)
const (
   QueryEventUpdateDBData          = "UpdateDBData"
   QueryNodesByTopic               = "queryNodeByTopic"
   QueryRpc                        = "queryRpc"
   UserEventSyncSql                = "SyncSql"
   UserEventSyncDbTablePersonCache = "SyncCache"
   UserEventSyncVirtualIp          = "SyncVirtualIp"              //漂移ip修改
   UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //同步注册信息
   DataSystemSerfSubscribe         = "data-system-serf-subscribe" //各app从serf订阅消息
   TcpTransportPort                = 30194                        //tcp传输大数据量接口
)
var SyncDbTablePersonCacheChan = make(chan []byte, 512)
var SyncVirtualIpChan = make(chan []byte, 512)
func HandleSerfEvent(event serf.Event) {
   switch ev := event.(type) {
   case serf.UserEvent:
      if ev.Name == UserEventSyncSql {
         HandleUserEventSyncSql(ev)
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         HandleUserEventSyncDbTablePersonCache(ev)
      } else if ev.Name == UserEventSyncVirtualIp {
         HandleUserEventSyncVirtualIp(ev)
      } else if ev.Name == UserEventSyncRegisterInfo {
         HandleSyncRegisterInfo(ev)
      } else if ev.Name == DataSystemSerfSubscribe {
         HandleDataSystemSerfSub(ev)
      }
   case *serf.Query:
      if ev.Name == QueryEventUpdateDBData {
         HandleQueryEventUpdateDBData(ev)
      } else if ev.Name == QueryNodesByTopic {
         HandleOtherQuery(ev)
      } else if ev.Name == QueryRpc {
         HandleQueryRpc(ev)
      }
   case serf.MemberEvent:
      if event.EventType() == serf.EventMemberLeave {
         HandleEventMemberLeave(ev)
      } else if event.EventType() == serf.EventMemberJoin {
         HandleEventMemberJoin(ev)
      }
   default:
      logger.Warn("Unknown event type: %s\n", ev.EventType().String())
   }
}
func executeSqlByGorm(sqls []string) (bool, error) {
   if len(sqls) > 0 {
      db := models.GetDB()
      if db != nil {
         db.LogMode(false)
         defer db.LogMode(true)
         var err error
         tx := db.Begin()
         defer func() {
            if err != nil && tx != nil {
               tx.Rollback()
            }
         }()
         for _, sql := range sqls {
            result := tx.Exec(sql)
            err = result.Error
            if err != nil {
               logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
               return false, err
            }
            if result.RowsAffected == 0 {
               logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
               err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
               return false, err
            }
         }
         tx.Commit()
         return true, nil
      }
      return false, errors.New("db handle is nil")
   }
   return true, nil
}
type SqlUserEvent struct {
   Owner string   `json:"owner"`
   Sql   []string `json:"sql"`
}
type TableDesc struct {
   Cid       int         `json:"cid"`
   Name      string      `json:"name"`
   Type      string      `json:"type"`
   Notnull   bool        `json:"notnull"`
   DFltValue interface{} `json:"dflt_value"`
   Pk        int         `json:"pk"`
}
type DumpSql struct {
   Sql string `json:"sql"`
}
const (
   DbT_TableName = "dbtables"
   DBP_TableName = "dbtablepersons"
)
func DumpTables(tableNames []string) ([]string, error) {
   db := models.GetDB()
   db.LogMode(false)
   defer db.LogMode(true)
   if tableNames != nil {
      var arr []string
      var dumpSql []DumpSql
      for _, table := range tableNames {
         logger.Info("dump current tableName:", table)
         dumpSql = make([]DumpSql, 0)
         var tDescArr []TableDesc
         tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
         err := db.Raw(tSql).Scan(&tDescArr).Error
         logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
         if err != nil {
            return nil, errors.New("tableDesc err")
         }
         logger.Info(table, "'Columns is:", tDescArr)
         if tDescArr == nil || len(tDescArr) == 0 {
            return nil, errors.New(table + " has no column")
         }
         var columnNames []string
         for _, col := range tDescArr {
            columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
         }
         if table == DbT_TableName {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`,
               table,
               strings.Join(columnNames, ","),
               table)
         } else if table == DBP_TableName {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`,
               table,
               strings.Join(columnNames, ","),
               table)
         } else {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
               table,
               strings.Join(columnNames, ","),
               table)
         }
         logger.Info("tSql:", tSql)
         err = db.Raw(tSql).Scan(&dumpSql).Error
         logger.Debug("dump err:", err)
         if err != nil {
            return nil, errors.New("dump err")
         }
         if len(dumpSql) > 0 {
            for _, d := range dumpSql {
               arr = append(arr, d.Sql)
            }
         }
      }
      return arr, nil
   }
   return nil, errors.New("tableNames is nil")
}
type QueryTableDataParam struct {
   Tables []string `json:"tables"`
   From   string   `json:"from"`
}
var QueryTcpResponseChan = make(chan []byte)
func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) {
   //members: get name of first member
   mbs := a.GroupMembers(clusterId)
   var specmembername string
   for _, m := range mbs {
      logger.Info("m", m)
      if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
         if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
            if strings.HasPrefix(m.Name, "DSVAD") {
               specmembername = m.Name
               break
            }
         } else {
            specmembername = m.Name
            break
         }
      }
   }
   logger.Info("mbs:", mbs, "specmembername:", specmembername)
   if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
      return nil, errors.New("specmembername not found")
   }
   //query: get db file.
   params := serf.QueryParam{
      FilterNodes: strings.Fields(specmembername),
   }
   //get db tables
   var fromP = QueryTableDataParam{
      Tables: tableNames,
      From:   config.Server.AnalyServerId,
   }
   tBytes, _ := json.Marshal(fromP)
   resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      logger.Error("err: ", err)
   }
   logger.Info("Query.resp.err:", err, "resp:", resp)
   var dumpSqls []string
   var wg sync.WaitGroup
   wg.Add(1)
   ticker := time.NewTicker(timeout)
   go func(tk *time.Ticker) {
      defer tk.Stop()
      defer wg.Done()
      for {
         select {
         case <-tk.C:
            return
         case msg := <-QueryTcpResponseChan:
            logger.Info("Query response's len:", len(msg))
            err := json.Unmarshal(msg, &dumpSqls)
            if err == nil {
               logger.Error("dumpSql:", dumpSqls)
               logger.Error("data dump success")
            }
            return
         }
      }
   }(ticker)
   wg.Wait()
   return &dumpSqls, nil
}
func SyncSql(sqlOp []string) {
   var sqlUe = SqlUserEvent{
      Owner: config.Server.AnalyServerId,
      Sql:   sqlOp,
   }
   ueB, err := json.Marshal(sqlUe)
   if err != nil {
      logger.Error("sqlUE marshal err:", err)
      return
   }
   err = Agent.UserEvent(UserEventSyncSql, ueB, false)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      logger.Error("err: ", err)
   }
}
package serf
import (
   "basic.com/syncdb.git"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "errors"
   "fmt"
   "strings"
   "sync"
   "time"
   "vamicro/config"
   "vamicro/system-service/models"
)
const (
   QueryEventUpdateDBData          = "UpdateDBData"
   QueryNodesByTopic               = "queryNodeByTopic"
   QueryRpc                        = "queryRpc"
   UserEventSyncSql                = "SyncSql"
   UserEventSyncDbTablePersonCache = "SyncCache"
   UserEventSyncVirtualIp          = "SyncVirtualIp"              //漂移ip修改
   UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //同步注册信息
   DataSystemSerfSubscribe         = "data-system-serf-subscribe" //各app从serf订阅消息
   TcpTransportPort                = 30194                        //tcp传输大数据量接口
)
var SyncDbTablePersonCacheChan = make(chan []byte, 512)
var SyncVirtualIpChan = make(chan []byte, 512)
func HandleSerfEvent(event serf.Event) {
   switch ev := event.(type) {
   case serf.UserEvent:
      if ev.Name == UserEventSyncSql {
         HandleUserEventSyncSql(ev)
      } else if ev.Name == UserEventSyncDbTablePersonCache {
         HandleUserEventSyncDbTablePersonCache(ev)
      } else if ev.Name == UserEventSyncVirtualIp {
         HandleUserEventSyncVirtualIp(ev)
      } else if ev.Name == UserEventSyncRegisterInfo {
         HandleSyncRegisterInfo(ev)
      } else if ev.Name == DataSystemSerfSubscribe {
         HandleDataSystemSerfSub(ev)
      }
   case *serf.Query:
      if ev.Name == QueryEventUpdateDBData {
         HandleQueryEventUpdateDBData(ev)
      } else if ev.Name == QueryNodesByTopic {
         HandleOtherQuery(ev)
      } else if ev.Name == QueryRpc {
         HandleQueryRpc(ev)
      }
   case serf.MemberEvent:
      if event.EventType() == serf.EventMemberLeave {
         HandleEventMemberLeave(ev)
      } else if event.EventType() == serf.EventMemberJoin {
         HandleEventMemberJoin(ev)
      }
   default:
      logger.Warn("Unknown event type: %s\n", ev.EventType().String())
   }
}
func executeSqlByGorm(sqls []string) (bool, error) {
   if len(sqls) > 0 {
      db := models.GetDB()
      if db != nil {
         db.LogMode(false)
         defer db.LogMode(true)
         var err error
         tx := db.Begin()
         defer func() {
            if err != nil && tx != nil {
               tx.Rollback()
            }
         }()
         for _, sql := range sqls {
            result := tx.Exec(sql)
            err = result.Error
            if err != nil {
               logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
               return false, err
            }
            if result.RowsAffected == 0 {
               logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
               err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
               return false, err
            }
         }
         tx.Commit()
         return true, nil
      }
      return false, errors.New("db handle is nil")
   }
   return true, nil
}
type SqlUserEvent struct {
   Owner string   `json:"owner"`
   Sql   []string `json:"sql"`
}
type TableDesc struct {
   Cid       int         `json:"cid"`
   Name      string      `json:"name"`
   Type      string      `json:"type"`
   Notnull   bool        `json:"notnull"`
   DFltValue interface{} `json:"dflt_value"`
   Pk        int         `json:"pk"`
}
type DumpSql struct {
   Sql string `json:"sql"`
}
const (
   DbT_TableName = "dbtables"
   DBP_TableName = "dbtablepersons"
)
func DumpTables(tableNames []string) ([]string, error) {
   db := models.GetDB()
   db.LogMode(false)
   defer db.LogMode(true)
   if tableNames != nil {
      var arr []string
      var dumpSql []DumpSql
      for _, table := range tableNames {
         logger.Info("dump current tableName:", table)
         dumpSql = make([]DumpSql, 0)
         var tDescArr []TableDesc
         tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
         err := db.Raw(tSql).Scan(&tDescArr).Error
         logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
         if err != nil {
            return nil, errors.New("tableDesc err")
         }
         logger.Info(table, "'Columns is:", tDescArr)
         if tDescArr == nil || len(tDescArr) == 0 {
            return nil, errors.New(table + " has no column")
         }
         var columnNames []string
         for _, col := range tDescArr {
            columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
         }
         if table == DbT_TableName {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`,
               table,
               strings.Join(columnNames, ","),
               table)
         } else if table == DBP_TableName {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`,
               table,
               strings.Join(columnNames, ","),
               table)
         } else {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
               table,
               strings.Join(columnNames, ","),
               table)
         }
         logger.Info("tSql:", tSql)
         err = db.Raw(tSql).Scan(&dumpSql).Error
         logger.Debug("dump err:", err)
         if err != nil {
            return nil, errors.New("dump err")
         }
         if len(dumpSql) > 0 {
            for _, d := range dumpSql {
               arr = append(arr, d.Sql)
            }
         }
      }
      return arr, nil
   }
   return nil, errors.New("tableNames is nil")
}
type QueryTableDataParam struct {
   Tables []string `json:"tables"`
   From   string   `json:"from"`
}
var QueryTcpResponseChan = make(chan []byte)
func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) {
   //members: get name of first member
   mbs := a.GroupMembers(clusterId)
   var specmembername string
   for _, m := range mbs {
      logger.Info("m", m)
      if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
         if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
            if strings.HasPrefix(m.Name, "DSVAD") {
               specmembername = m.Name
               break
            }
         } else {
            specmembername = m.Name
            break
         }
      }
   }
   logger.Info("mbs:", mbs, "specmembername:", specmembername)
   if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
      return nil, errors.New("specmembername not found")
   }
   //query: get db file.
   params := serf.QueryParam{
      FilterNodes: strings.Fields(specmembername),
   }
   //get db tables
   var fromP = QueryTableDataParam{
      Tables: tableNames,
      From:   config.Server.AnalyServerId,
   }
   tBytes, _ := json.Marshal(fromP)
   resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      logger.Error("err: ", err)
   }
   logger.Info("Query.resp.err:", err, "resp:", resp)
   var dumpSqls []string
   var wg sync.WaitGroup
   wg.Add(1)
   ticker := time.NewTicker(timeout)
   go func(tk *time.Ticker) {
      defer tk.Stop()
      defer wg.Done()
      for {
         select {
         case <-tk.C:
            return
         case msg := <-QueryTcpResponseChan:
            logger.Info("Query response's len:", len(msg))
            err := json.Unmarshal(msg, &dumpSqls)
            if err == nil {
               logger.Error("dumpSql:", dumpSqls)
               logger.Error("data dump success")
            }
            return
         }
      }
   }(ticker)
   wg.Wait()
   return &dumpSqls, nil
}
func SyncSql(sqlOp []string) {
   var sqlUe = SqlUserEvent{
      Owner: config.Server.AnalyServerId,
      Sql:   sqlOp,
   }
   ueB, err := json.Marshal(sqlUe)
   if err != nil {
      logger.Error("sqlUE marshal err:", err)
      return
   }
   err = Agent.UserEvent(UserEventSyncSql, ueB, false)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      logger.Error("err: ", err)
   }
}