From 63645d248c765244488cd34dbc1bb6528ca6b7c7 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 05 九月 2023 09:58:13 +0800 Subject: [PATCH] 修复编译 --- system-service/serf/serf.go | 546 +++++++++++++++++++++++++++--------------------------- 1 files changed, 273 insertions(+), 273 deletions(-) diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go index 58b9af2..9158097 100644 --- a/system-service/serf/serf.go +++ b/system-service/serf/serf.go @@ -1,273 +1,273 @@ -package serf - -import ( - "basic.com/syncdb.git" - "basic.com/valib/logger.git" - "basic.com/valib/serf.git/serf" - "encoding/json" - "errors" - "fmt" - "strings" - "sync" - "time" - "vamicro/config" - "vamicro/system-service/models" -) - -const ( - QueryEventUpdateDBData = "UpdateDBData" - QueryNodesByTopic = "queryNodeByTopic" - QueryRpc = "queryRpc" - UserEventSyncSql = "SyncSql" - UserEventSyncDbTablePersonCache = "SyncCache" - UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼 - UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅 - DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅 - TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛 -) - -var SyncDbTablePersonCacheChan = make(chan []byte, 512) -var SyncVirtualIpChan = make(chan []byte, 512) - -func HandleSerfEvent(event serf.Event) { - switch ev := event.(type) { - case serf.UserEvent: - if ev.Name == UserEventSyncSql { - HandleUserEventSyncSql(ev) - } else if ev.Name == UserEventSyncDbTablePersonCache { - HandleUserEventSyncDbTablePersonCache(ev) - } else if ev.Name == UserEventSyncVirtualIp { - HandleUserEventSyncVirtualIp(ev) - } else if ev.Name == UserEventSyncRegisterInfo { - HandleSyncRegisterInfo(ev) - } else if ev.Name == DataSystemSerfSubscribe { - HandleDataSystemSerfSub(ev) - } - case *serf.Query: - if ev.Name == QueryEventUpdateDBData { - HandleQueryEventUpdateDBData(ev) - } else if ev.Name == QueryNodesByTopic { - HandleOtherQuery(ev) - } else if ev.Name == QueryRpc { - HandleQueryRpc(ev) - } - case serf.MemberEvent: - if event.EventType() == serf.EventMemberLeave { - HandleEventMemberLeave(ev) - } else if event.EventType() == serf.EventMemberJoin { - HandleEventMemberJoin(ev) - } - - default: - logger.Warn("Unknown event type: %s\n", ev.EventType().String()) - } -} - -func executeSqlByGorm(sqls []string) (bool, error) { - if len(sqls) > 0 { - db := models.GetDB() - if db != nil { - db.LogMode(false) - defer db.LogMode(true) - var err error - tx := db.Begin() - defer func() { - if err != nil && tx != nil { - tx.Rollback() - } - }() - for _, sql := range sqls { - result := tx.Exec(sql) - err = result.Error - if err != nil { - logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) - return false, err - } - if result.RowsAffected == 0 { - logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) - err = errors.New("ExecuteSqlByGorm RowsAffected == 0") - return false, err - } - } - tx.Commit() - return true, nil - } - return false, errors.New("db handle is nil") - } - return true, nil -} - -type SqlUserEvent struct { - Owner string `json:"owner"` - Sql []string `json:"sql"` -} - -type TableDesc struct { - Cid int `json:"cid"` - Name string `json:"name"` - Type string `json:"type"` - Notnull bool `json:"notnull"` - DFltValue interface{} `json:"dflt_value"` - Pk int `json:"pk"` -} - -type DumpSql struct { - Sql string `json:"sql"` -} - -const ( - DbT_TableName = "dbtables" - DBP_TableName = "dbtablepersons" -) - -func DumpTables(tableNames []string) ([]string, error) { - db := models.GetDB() - db.LogMode(false) - defer db.LogMode(true) - if tableNames != nil { - var arr []string - var dumpSql []DumpSql - for _, table := range tableNames { - logger.Info("dump current tableName:", table) - dumpSql = make([]DumpSql, 0) - var tDescArr []TableDesc - tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table) - err := db.Raw(tSql).Scan(&tDescArr).Error - logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr)) - if err != nil { - return nil, errors.New("tableDesc err") - } - logger.Info(table, "'Columns is:", tDescArr) - if tDescArr == nil || len(tDescArr) == 0 { - return nil, errors.New(table + " has no column") - } - var columnNames []string - for _, col := range tDescArr { - columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name)) - } - if table == DbT_TableName { - tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`, - table, - strings.Join(columnNames, ","), - table) - } else if table == DBP_TableName { - tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`, - table, - strings.Join(columnNames, ","), - table) - } else { - tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`, - table, - strings.Join(columnNames, ","), - table) - } - - logger.Info("tSql:", tSql) - - err = db.Raw(tSql).Scan(&dumpSql).Error - logger.Debug("dump err:", err) - if err != nil { - return nil, errors.New("dump err") - } - if len(dumpSql) > 0 { - for _, d := range dumpSql { - arr = append(arr, d.Sql) - } - } - - } - return arr, nil - } - return nil, errors.New("tableNames is nil") -} - -type QueryTableDataParam struct { - Tables []string `json:"tables"` - From string `json:"from"` -} - -var QueryTcpResponseChan = make(chan []byte) - -func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) { - //members: get name of first member - mbs := a.GroupMembers(clusterId) - var specmembername string - for _, m := range mbs { - logger.Info("m", m) - if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad - if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { - if strings.HasPrefix(m.Name, "DSVAD") { - specmembername = m.Name - break - } - } else { - specmembername = m.Name - break - } - } - } - logger.Info("mbs:", mbs, "specmembername:", specmembername) - if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐� - return nil, errors.New("specmembername not found") - } - - //query: get db file. - params := serf.QueryParam{ - FilterNodes: strings.Fields(specmembername), - } - - //get db tables - var fromP = QueryTableDataParam{ - Tables: tableNames, - From: config.Server.AnalyServerId, - } - tBytes, _ := json.Marshal(fromP) - - resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) - if err == nil || !strings.Contains(err.Error(), "cannot contain") { - logger.Error("err: ", err) - } - logger.Info("Query.resp.err:", err, "resp:", resp) - - var dumpSqls []string - - var wg sync.WaitGroup - wg.Add(1) - ticker := time.NewTicker(timeout) - go func(tk *time.Ticker) { - defer tk.Stop() - defer wg.Done() - for { - select { - case <-tk.C: - return - case msg := <-QueryTcpResponseChan: - logger.Info("Query response's len:", len(msg)) - err := json.Unmarshal(msg, &dumpSqls) - if err == nil { - logger.Error("dumpSql:", dumpSqls) - logger.Error("data dump success") - } - return - } - } - }(ticker) - wg.Wait() - return &dumpSqls, nil -} - -func SyncSql(sqlOp []string) { - var sqlUe = SqlUserEvent{ - Owner: config.Server.AnalyServerId, - Sql: sqlOp, - } - ueB, err := json.Marshal(sqlUe) - if err != nil { - logger.Error("sqlUE marshal err:", err) - return - } - err = Agent.UserEvent(UserEventSyncSql, ueB, false) - if err == nil || !strings.Contains(err.Error(), "cannot contain") { - logger.Error("err: ", err) - } -} +package serf + +import ( + "basic.com/syncdb.git" + "basic.com/valib/logger.git" + "basic.com/valib/serf.git/serf" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + "vamicro/config" + "vamicro/system-service/models" +) + +const ( + QueryEventUpdateDBData = "UpdateDBData" + QueryNodesByTopic = "queryNodeByTopic" + QueryRpc = "queryRpc" + UserEventSyncSql = "SyncSql" + UserEventSyncDbTablePersonCache = "SyncCache" + UserEventSyncVirtualIp = "SyncVirtualIp" //婕傜Щip淇敼 + UserEventSyncRegisterInfo = "SyncRegisterInfo" //鍚屾娉ㄥ唽淇℃伅 + DataSystemSerfSubscribe = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅 + TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛 +) + +var SyncDbTablePersonCacheChan = make(chan []byte, 512) +var SyncVirtualIpChan = make(chan []byte, 512) + +func HandleSerfEvent(event serf.Event) { + switch ev := event.(type) { + case serf.UserEvent: + if ev.Name == UserEventSyncSql { + HandleUserEventSyncSql(ev) + } else if ev.Name == UserEventSyncDbTablePersonCache { + HandleUserEventSyncDbTablePersonCache(ev) + } else if ev.Name == UserEventSyncVirtualIp { + HandleUserEventSyncVirtualIp(ev) + } else if ev.Name == UserEventSyncRegisterInfo { + HandleSyncRegisterInfo(ev) + } else if ev.Name == DataSystemSerfSubscribe { + HandleDataSystemSerfSub(ev) + } + case *serf.Query: + if ev.Name == QueryEventUpdateDBData { + HandleQueryEventUpdateDBData(ev) + } else if ev.Name == QueryNodesByTopic { + HandleOtherQuery(ev) + } else if ev.Name == QueryRpc { + HandleQueryRpc(ev) + } + case serf.MemberEvent: + if event.EventType() == serf.EventMemberLeave { + HandleEventMemberLeave(ev) + } else if event.EventType() == serf.EventMemberJoin { + HandleEventMemberJoin(ev) + } + + default: + logger.Warn("Unknown event type: %s\n", ev.EventType().String()) + } +} + +func executeSqlByGorm(sqls []string) (bool, error) { + if len(sqls) > 0 { + db := models.GetDB() + if db != nil { + db.LogMode(false) + defer db.LogMode(true) + var err error + tx := db.Begin() + defer func() { + if err != nil && tx != nil { + tx.Rollback() + } + }() + for _, sql := range sqls { + result := tx.Exec(sql) + err = result.Error + if err != nil { + logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) + return false, err + } + if result.RowsAffected == 0 { + logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) + err = errors.New("ExecuteSqlByGorm RowsAffected == 0") + return false, err + } + } + tx.Commit() + return true, nil + } + return false, errors.New("db handle is nil") + } + return true, nil +} + +type SqlUserEvent struct { + Owner string `json:"owner"` + Sql []string `json:"sql"` +} + +type TableDesc struct { + Cid int `json:"cid"` + Name string `json:"name"` + Type string `json:"type"` + Notnull bool `json:"notnull"` + DFltValue interface{} `json:"dflt_value"` + Pk int `json:"pk"` +} + +type DumpSql struct { + Sql string `json:"sql"` +} + +const ( + DbT_TableName = "dbtables" + DBP_TableName = "dbtablepersons" +) + +func DumpTables(tableNames []string) ([]string, error) { + db := models.GetDB() + db.LogMode(false) + defer db.LogMode(true) + if tableNames != nil { + var arr []string + var dumpSql []DumpSql + for _, table := range tableNames { + logger.Info("dump current tableName:", table) + dumpSql = make([]DumpSql, 0) + var tDescArr []TableDesc + tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table) + err := db.Raw(tSql).Scan(&tDescArr).Error + logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr)) + if err != nil { + return nil, errors.New("tableDesc err") + } + logger.Info(table, "'Columns is:", tDescArr) + if tDescArr == nil || len(tDescArr) == 0 { + return nil, errors.New(table + " has no column") + } + var columnNames []string + for _, col := range tDescArr { + columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name)) + } + if table == DbT_TableName { + tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`, + table, + strings.Join(columnNames, ","), + table) + } else if table == DBP_TableName { + tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`, + table, + strings.Join(columnNames, ","), + table) + } else { + tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`, + table, + strings.Join(columnNames, ","), + table) + } + + logger.Info("tSql:", tSql) + + err = db.Raw(tSql).Scan(&dumpSql).Error + logger.Debug("dump err:", err) + if err != nil { + return nil, errors.New("dump err") + } + if len(dumpSql) > 0 { + for _, d := range dumpSql { + arr = append(arr, d.Sql) + } + } + + } + return arr, nil + } + return nil, errors.New("tableNames is nil") +} + +type QueryTableDataParam struct { + Tables []string `json:"tables"` + From string `json:"from"` +} + +var QueryTcpResponseChan = make(chan []byte) + +func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) { + //members: get name of first member + mbs := a.GroupMembers(clusterId) + var specmembername string + for _, m := range mbs { + logger.Info("m", m) + if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad + if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { + if strings.HasPrefix(m.Name, "DSVAD") { + specmembername = m.Name + break + } + } else { + specmembername = m.Name + break + } + } + } + logger.Info("mbs:", mbs, "specmembername:", specmembername) + if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐� + return nil, errors.New("specmembername not found") + } + + //query: get db file. + params := serf.QueryParam{ + FilterNodes: strings.Fields(specmembername), + } + + //get db tables + var fromP = QueryTableDataParam{ + Tables: tableNames, + From: config.Server.AnalyServerId, + } + tBytes, _ := json.Marshal(fromP) + + resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) + if err == nil || !strings.Contains(err.Error(), "cannot contain") { + logger.Error("err: ", err) + } + logger.Info("Query.resp.err:", err, "resp:", resp) + + var dumpSqls []string + + var wg sync.WaitGroup + wg.Add(1) + ticker := time.NewTicker(timeout) + go func(tk *time.Ticker) { + defer tk.Stop() + defer wg.Done() + for { + select { + case <-tk.C: + return + case msg := <-QueryTcpResponseChan: + logger.Info("Query response's len:", len(msg)) + err := json.Unmarshal(msg, &dumpSqls) + if err == nil { + logger.Error("dumpSql:", dumpSqls) + logger.Error("data dump success") + } + return + } + } + }(ticker) + wg.Wait() + return &dumpSqls, nil +} + +func SyncSql(sqlOp []string) { + var sqlUe = SqlUserEvent{ + Owner: config.Server.AnalyServerId, + Sql: sqlOp, + } + ueB, err := json.Marshal(sqlUe) + if err != nil { + logger.Error("sqlUE marshal err:", err) + return + } + err = Agent.UserEvent(UserEventSyncSql, ueB, false) + if err == nil || !strings.Contains(err.Error(), "cannot contain") { + logger.Error("err: ", err) + } +} -- Gitblit v1.8.0