| | |
| | | package serf |
| | | |
| | | import ( |
| | | "basic.com/syncdb.git" |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/serf.git/serf" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | "vamicro/config" |
| | | "vamicro/system-service/models" |
| | | ) |
| | | |
| | | const ( |
| | | QueryEventUpdateDBData = "UpdateDBData" |
| | | QueryNodesByTopic = "queryNodeByTopic" |
| | | QueryRpc = "queryRpc" |
| | | UserEventSyncSql = "SyncSql" |
| | | UserEventSyncDbTablePersonCache = "SyncCache" |
| | | UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改 |
| | | UserEventSyncRegisterInfo = "SyncRegisterInfo" //同步注册信息 |
| | | DataSystemSerfSubscribe = "data-system-serf-subscribe" //各app从serf订阅消息 |
| | | TcpTransportPort = 30194 //tcp传输大数据量接口 |
| | | ) |
| | | |
| | | var SyncDbTablePersonCacheChan = make(chan []byte, 512) |
| | | var SyncVirtualIpChan = make(chan []byte, 512) |
| | | |
| | | func HandleSerfEvent(event serf.Event) { |
| | | switch ev := event.(type) { |
| | | case serf.UserEvent: |
| | | if ev.Name == UserEventSyncSql { |
| | | HandleUserEventSyncSql(ev) |
| | | } else if ev.Name == UserEventSyncDbTablePersonCache { |
| | | HandleUserEventSyncDbTablePersonCache(ev) |
| | | } else if ev.Name == UserEventSyncVirtualIp { |
| | | HandleUserEventSyncVirtualIp(ev) |
| | | } else if ev.Name == UserEventSyncRegisterInfo { |
| | | HandleSyncRegisterInfo(ev) |
| | | } else if ev.Name == DataSystemSerfSubscribe { |
| | | HandleDataSystemSerfSub(ev) |
| | | } |
| | | case *serf.Query: |
| | | if ev.Name == QueryEventUpdateDBData { |
| | | HandleQueryEventUpdateDBData(ev) |
| | | } else if ev.Name == QueryNodesByTopic { |
| | | HandleOtherQuery(ev) |
| | | } else if ev.Name == QueryRpc { |
| | | HandleQueryRpc(ev) |
| | | } |
| | | case serf.MemberEvent: |
| | | if event.EventType() == serf.EventMemberLeave { |
| | | HandleEventMemberLeave(ev) |
| | | } else if event.EventType() == serf.EventMemberJoin { |
| | | HandleEventMemberJoin(ev) |
| | | } |
| | | |
| | | default: |
| | | logger.Warn("Unknown event type: %s\n", ev.EventType().String()) |
| | | } |
| | | } |
| | | |
| | | func executeSqlByGorm(sqls []string) (bool, error) { |
| | | if len(sqls) > 0 { |
| | | db := models.GetDB() |
| | | if db != nil { |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | var err error |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | for _, sql := range sqls { |
| | | result := tx.Exec(sql) |
| | | err = result.Error |
| | | if err != nil { |
| | | logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) |
| | | return false, err |
| | | } |
| | | if result.RowsAffected == 0 { |
| | | logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) |
| | | err = errors.New("ExecuteSqlByGorm RowsAffected == 0") |
| | | return false, err |
| | | } |
| | | } |
| | | tx.Commit() |
| | | return true, nil |
| | | } |
| | | return false, errors.New("db handle is nil") |
| | | } |
| | | return true, nil |
| | | } |
| | | |
| | | type SqlUserEvent struct { |
| | | Owner string `json:"owner"` |
| | | Sql []string `json:"sql"` |
| | | } |
| | | |
| | | type TableDesc struct { |
| | | Cid int `json:"cid"` |
| | | Name string `json:"name"` |
| | | Type string `json:"type"` |
| | | Notnull bool `json:"notnull"` |
| | | DFltValue interface{} `json:"dflt_value"` |
| | | Pk int `json:"pk"` |
| | | } |
| | | |
| | | type DumpSql struct { |
| | | Sql string `json:"sql"` |
| | | } |
| | | |
| | | const ( |
| | | DbT_TableName = "dbtables" |
| | | DBP_TableName = "dbtablepersons" |
| | | ) |
| | | |
| | | func DumpTables(tableNames []string) ([]string, error) { |
| | | db := models.GetDB() |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | if tableNames != nil { |
| | | var arr []string |
| | | var dumpSql []DumpSql |
| | | for _, table := range tableNames { |
| | | logger.Info("dump current tableName:", table) |
| | | dumpSql = make([]DumpSql, 0) |
| | | var tDescArr []TableDesc |
| | | tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table) |
| | | err := db.Raw(tSql).Scan(&tDescArr).Error |
| | | logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr)) |
| | | if err != nil { |
| | | return nil, errors.New("tableDesc err") |
| | | } |
| | | logger.Info(table, "'Columns is:", tDescArr) |
| | | if tDescArr == nil || len(tDescArr) == 0 { |
| | | return nil, errors.New(table + " has no column") |
| | | } |
| | | var columnNames []string |
| | | for _, col := range tDescArr { |
| | | columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name)) |
| | | } |
| | | if table == DbT_TableName { |
| | | tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`, |
| | | table, |
| | | strings.Join(columnNames, ","), |
| | | table) |
| | | } else if table == DBP_TableName { |
| | | tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`, |
| | | table, |
| | | strings.Join(columnNames, ","), |
| | | table) |
| | | } else { |
| | | tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`, |
| | | table, |
| | | strings.Join(columnNames, ","), |
| | | table) |
| | | } |
| | | |
| | | logger.Info("tSql:", tSql) |
| | | |
| | | err = db.Raw(tSql).Scan(&dumpSql).Error |
| | | logger.Debug("dump err:", err) |
| | | if err != nil { |
| | | return nil, errors.New("dump err") |
| | | } |
| | | if len(dumpSql) > 0 { |
| | | for _, d := range dumpSql { |
| | | arr = append(arr, d.Sql) |
| | | } |
| | | } |
| | | |
| | | } |
| | | return arr, nil |
| | | } |
| | | return nil, errors.New("tableNames is nil") |
| | | } |
| | | |
| | | type QueryTableDataParam struct { |
| | | Tables []string `json:"tables"` |
| | | From string `json:"from"` |
| | | } |
| | | |
| | | var QueryTcpResponseChan = make(chan []byte) |
| | | |
| | | func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) { |
| | | //members: get name of first member |
| | | mbs := a.GroupMembers(clusterId) |
| | | var specmembername string |
| | | for _, m := range mbs { |
| | | logger.Info("m", m) |
| | | if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad |
| | | if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { |
| | | if strings.HasPrefix(m.Name, "DSVAD") { |
| | | specmembername = m.Name |
| | | break |
| | | } |
| | | } else { |
| | | specmembername = m.Name |
| | | break |
| | | } |
| | | } |
| | | } |
| | | logger.Info("mbs:", mbs, "specmembername:", specmembername) |
| | | if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点 |
| | | return nil, errors.New("specmembername not found") |
| | | } |
| | | |
| | | //query: get db file. |
| | | params := serf.QueryParam{ |
| | | FilterNodes: strings.Fields(specmembername), |
| | | } |
| | | |
| | | //get db tables |
| | | var fromP = QueryTableDataParam{ |
| | | Tables: tableNames, |
| | | From: config.Server.AnalyServerId, |
| | | } |
| | | tBytes, _ := json.Marshal(fromP) |
| | | |
| | | resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) |
| | | if err == nil || !strings.Contains(err.Error(), "cannot contain") { |
| | | logger.Error("err: ", err) |
| | | } |
| | | logger.Info("Query.resp.err:", err, "resp:", resp) |
| | | |
| | | var dumpSqls []string |
| | | |
| | | var wg sync.WaitGroup |
| | | wg.Add(1) |
| | | ticker := time.NewTicker(timeout) |
| | | go func(tk *time.Ticker) { |
| | | defer tk.Stop() |
| | | defer wg.Done() |
| | | for { |
| | | select { |
| | | case <-tk.C: |
| | | return |
| | | case msg := <-QueryTcpResponseChan: |
| | | logger.Info("Query response's len:", len(msg)) |
| | | err := json.Unmarshal(msg, &dumpSqls) |
| | | if err == nil { |
| | | logger.Error("dumpSql:", dumpSqls) |
| | | logger.Error("data dump success") |
| | | } |
| | | return |
| | | } |
| | | } |
| | | }(ticker) |
| | | wg.Wait() |
| | | return &dumpSqls, nil |
| | | } |
| | | |
| | | func SyncSql(sqlOp []string) { |
| | | var sqlUe = SqlUserEvent{ |
| | | Owner: config.Server.AnalyServerId, |
| | | Sql: sqlOp, |
| | | } |
| | | ueB, err := json.Marshal(sqlUe) |
| | | if err != nil { |
| | | logger.Error("sqlUE marshal err:", err) |
| | | return |
| | | } |
| | | err = Agent.UserEvent(UserEventSyncSql, ueB, false) |
| | | if err == nil || !strings.Contains(err.Error(), "cannot contain") { |
| | | logger.Error("err: ", err) |
| | | } |
| | | } |
| | | package serf
|
| | |
|
| | | import (
|
| | | "basic.com/syncdb.git"
|
| | | "basic.com/valib/logger.git"
|
| | | "basic.com/valib/serf.git/serf"
|
| | | "encoding/json"
|
| | | "errors"
|
| | | "fmt"
|
| | | "strings"
|
| | | "sync"
|
| | | "time"
|
| | | "vamicro/config"
|
| | | "vamicro/system-service/models"
|
| | | )
|
| | |
|
| | | const (
|
| | | QueryEventUpdateDBData = "UpdateDBData"
|
| | | QueryNodesByTopic = "queryNodeByTopic"
|
| | | QueryRpc = "queryRpc"
|
| | | UserEventSyncSql = "SyncSql"
|
| | | UserEventSyncDbTablePersonCache = "SyncCache"
|
| | | UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改
|
| | | UserEventSyncRegisterInfo = "SyncRegisterInfo" //同步注册信息
|
| | | DataSystemSerfSubscribe = "data-system-serf-subscribe" //各app从serf订阅消息
|
| | | TcpTransportPort = 30194 //tcp传输大数据量接口
|
| | | )
|
| | |
|
| | | var SyncDbTablePersonCacheChan = make(chan []byte, 512)
|
| | | var SyncVirtualIpChan = make(chan []byte, 512)
|
| | |
|
| | | func HandleSerfEvent(event serf.Event) {
|
| | | switch ev := event.(type) {
|
| | | case serf.UserEvent:
|
| | | if ev.Name == UserEventSyncSql {
|
| | | HandleUserEventSyncSql(ev)
|
| | | } else if ev.Name == UserEventSyncDbTablePersonCache {
|
| | | HandleUserEventSyncDbTablePersonCache(ev)
|
| | | } else if ev.Name == UserEventSyncVirtualIp {
|
| | | HandleUserEventSyncVirtualIp(ev)
|
| | | } else if ev.Name == UserEventSyncRegisterInfo {
|
| | | HandleSyncRegisterInfo(ev)
|
| | | } else if ev.Name == DataSystemSerfSubscribe {
|
| | | HandleDataSystemSerfSub(ev)
|
| | | }
|
| | | case *serf.Query:
|
| | | if ev.Name == QueryEventUpdateDBData {
|
| | | HandleQueryEventUpdateDBData(ev)
|
| | | } else if ev.Name == QueryNodesByTopic {
|
| | | HandleOtherQuery(ev)
|
| | | } else if ev.Name == QueryRpc {
|
| | | HandleQueryRpc(ev)
|
| | | }
|
| | | case serf.MemberEvent:
|
| | | if event.EventType() == serf.EventMemberLeave {
|
| | | HandleEventMemberLeave(ev)
|
| | | } else if event.EventType() == serf.EventMemberJoin {
|
| | | HandleEventMemberJoin(ev)
|
| | | }
|
| | |
|
| | | default:
|
| | | logger.Warn("Unknown event type: %s\n", ev.EventType().String())
|
| | | }
|
| | | }
|
| | |
|
| | | func executeSqlByGorm(sqls []string) (bool, error) {
|
| | | if len(sqls) > 0 {
|
| | | db := models.GetDB()
|
| | | if db != nil {
|
| | | db.LogMode(false)
|
| | | defer db.LogMode(true)
|
| | | var err error
|
| | | tx := db.Begin()
|
| | | defer func() {
|
| | | if err != nil && tx != nil {
|
| | | tx.Rollback()
|
| | | }
|
| | | }()
|
| | | for _, sql := range sqls {
|
| | | result := tx.Exec(sql)
|
| | | err = result.Error
|
| | | if err != nil {
|
| | | logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql)
|
| | | return false, err
|
| | | }
|
| | | if result.RowsAffected == 0 {
|
| | | logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql)
|
| | | err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
|
| | | return false, err
|
| | | }
|
| | | }
|
| | | tx.Commit()
|
| | | return true, nil
|
| | | }
|
| | | return false, errors.New("db handle is nil")
|
| | | }
|
| | | return true, nil
|
| | | }
|
| | |
|
| | | type SqlUserEvent struct {
|
| | | Owner string `json:"owner"`
|
| | | Sql []string `json:"sql"`
|
| | | }
|
| | |
|
| | | type TableDesc struct {
|
| | | Cid int `json:"cid"`
|
| | | Name string `json:"name"`
|
| | | Type string `json:"type"`
|
| | | Notnull bool `json:"notnull"`
|
| | | DFltValue interface{} `json:"dflt_value"`
|
| | | Pk int `json:"pk"`
|
| | | }
|
| | |
|
| | | type DumpSql struct {
|
| | | Sql string `json:"sql"`
|
| | | }
|
| | |
|
| | | const (
|
| | | DbT_TableName = "dbtables"
|
| | | DBP_TableName = "dbtablepersons"
|
| | | )
|
| | |
|
| | | func DumpTables(tableNames []string) ([]string, error) {
|
| | | db := models.GetDB()
|
| | | db.LogMode(false)
|
| | | defer db.LogMode(true)
|
| | | if tableNames != nil {
|
| | | var arr []string
|
| | | var dumpSql []DumpSql
|
| | | for _, table := range tableNames {
|
| | | logger.Info("dump current tableName:", table)
|
| | | dumpSql = make([]DumpSql, 0)
|
| | | var tDescArr []TableDesc
|
| | | tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
|
| | | err := db.Raw(tSql).Scan(&tDescArr).Error
|
| | | logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr))
|
| | | if err != nil {
|
| | | return nil, errors.New("tableDesc err")
|
| | | }
|
| | | logger.Info(table, "'Columns is:", tDescArr)
|
| | | if tDescArr == nil || len(tDescArr) == 0 {
|
| | | return nil, errors.New(table + " has no column")
|
| | | }
|
| | | var columnNames []string
|
| | | for _, col := range tDescArr {
|
| | | columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
|
| | | }
|
| | | if table == DbT_TableName {
|
| | | tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`,
|
| | | table,
|
| | | strings.Join(columnNames, ","),
|
| | | table)
|
| | | } else if table == DBP_TableName {
|
| | | tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`,
|
| | | table,
|
| | | strings.Join(columnNames, ","),
|
| | | table)
|
| | | } else {
|
| | | tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
|
| | | table,
|
| | | strings.Join(columnNames, ","),
|
| | | table)
|
| | | }
|
| | |
|
| | | logger.Info("tSql:", tSql)
|
| | |
|
| | | err = db.Raw(tSql).Scan(&dumpSql).Error
|
| | | logger.Debug("dump err:", err)
|
| | | if err != nil {
|
| | | return nil, errors.New("dump err")
|
| | | }
|
| | | if len(dumpSql) > 0 {
|
| | | for _, d := range dumpSql {
|
| | | arr = append(arr, d.Sql)
|
| | | }
|
| | | }
|
| | |
|
| | | }
|
| | | return arr, nil
|
| | | }
|
| | | return nil, errors.New("tableNames is nil")
|
| | | }
|
| | |
|
| | | type QueryTableDataParam struct {
|
| | | Tables []string `json:"tables"`
|
| | | From string `json:"from"`
|
| | | }
|
| | |
|
| | | var QueryTcpResponseChan = make(chan []byte)
|
| | |
|
| | | func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) {
|
| | | //members: get name of first member
|
| | | mbs := a.GroupMembers(clusterId)
|
| | | var specmembername string
|
| | | for _, m := range mbs {
|
| | | logger.Info("m", m)
|
| | | if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
|
| | | if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
|
| | | if strings.HasPrefix(m.Name, "DSVAD") {
|
| | | specmembername = m.Name
|
| | | break
|
| | | }
|
| | | } else {
|
| | | specmembername = m.Name
|
| | | break
|
| | | }
|
| | | }
|
| | | }
|
| | | logger.Info("mbs:", mbs, "specmembername:", specmembername)
|
| | | if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
|
| | | return nil, errors.New("specmembername not found")
|
| | | }
|
| | |
|
| | | //query: get db file.
|
| | | params := serf.QueryParam{
|
| | | FilterNodes: strings.Fields(specmembername),
|
| | | }
|
| | |
|
| | | //get db tables
|
| | | var fromP = QueryTableDataParam{
|
| | | Tables: tableNames,
|
| | | From: config.Server.AnalyServerId,
|
| | | }
|
| | | tBytes, _ := json.Marshal(fromP)
|
| | |
|
| | | resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms)
|
| | | if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
| | | logger.Error("err: ", err)
|
| | | }
|
| | | logger.Info("Query.resp.err:", err, "resp:", resp)
|
| | |
|
| | | var dumpSqls []string
|
| | |
|
| | | var wg sync.WaitGroup
|
| | | wg.Add(1)
|
| | | ticker := time.NewTicker(timeout)
|
| | | go func(tk *time.Ticker) {
|
| | | defer tk.Stop()
|
| | | defer wg.Done()
|
| | | for {
|
| | | select {
|
| | | case <-tk.C:
|
| | | return
|
| | | case msg := <-QueryTcpResponseChan:
|
| | | logger.Info("Query response's len:", len(msg))
|
| | | err := json.Unmarshal(msg, &dumpSqls)
|
| | | if err == nil {
|
| | | logger.Error("dumpSql:", dumpSqls)
|
| | | logger.Error("data dump success")
|
| | | }
|
| | | return
|
| | | }
|
| | | }
|
| | | }(ticker)
|
| | | wg.Wait()
|
| | | return &dumpSqls, nil
|
| | | }
|
| | |
|
| | | func SyncSql(sqlOp []string) {
|
| | | var sqlUe = SqlUserEvent{
|
| | | Owner: config.Server.AnalyServerId,
|
| | | Sql: sqlOp,
|
| | | }
|
| | | ueB, err := json.Marshal(sqlUe)
|
| | | if err != nil {
|
| | | logger.Error("sqlUE marshal err:", err)
|
| | | return
|
| | | }
|
| | | err = Agent.UserEvent(UserEventSyncSql, ueB, false)
|
| | | if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
| | | logger.Error("err: ", err)
|
| | | }
|
| | | }
|