From 8324f872ef3a4d0c978a9b1d062800c6a1701c12 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 01 十二月 2023 09:58:17 +0800 Subject: [PATCH] fix --- serf/sync.go | 263 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 237 insertions(+), 26 deletions(-) diff --git a/serf/sync.go b/serf/sync.go index b49eebf..30be0ee 100644 --- a/serf/sync.go +++ b/serf/sync.go @@ -6,9 +6,12 @@ "fmt" "os" "os/signal" + "regexp" "strings" "syscall" "time" + + "apsClient/pkg/logx" "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomeclient.git" @@ -16,10 +19,18 @@ "github.com/gogo/protobuf/proto" "github.com/jinzhu/gorm" + "github.com/satori/go.uuid" + "github.com/mitchellh/mapstructure" + "github.com/muesli/cache2go" ) var ( - agent = SyncServer{} + agent = SyncServer{} + dependProcs = []string{ + bhomeclient.Proc_System_Service, + } + + sqlMsgSeqCache = cache2go.Cache("syncSqlMsg") ) const ( @@ -38,6 +49,12 @@ Proc string `json:"procName"` // 杩涚▼鍚� Topic string `json:"topic"` // 涓婚 Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 +} + +type SqlMsg struct { + Id string + Sql string + Version string } type SyncServer struct { @@ -61,9 +78,10 @@ agent.queryTableTopic = procName + "/serf/query/sqls" // 璁剧疆鏃ュ織鍥炶皟 - db.SetLogger(&DbLogger{}) + db.SetLogger(&agent) + // 鍏堝叧闂棩蹇� - db.LogMode(false) + //db.LogMode(false) return &agent } @@ -101,6 +119,42 @@ bhomedbapi.InitDoReq(client.RequestOnly) //bhomedbapi.InitLog(logger.Debug) + // 闇�瑕佺瓑寰卻ystem-service杩涚▼鎴愬姛鍚姩鍚庯紝鎵嶈兘鑾峰彇闆嗙兢鐘舵��(鎴栬�呬繚璇佺▼搴忓惎鍔ㄦ椂鑾峰彇鍒版纭殑鐘舵��) + tryTimes := 0 +loop: + for { + select { + case <-q: + initChan <- false + return + default: + if tryTimes < 15 { + clients, err := client.GetRegisteredClient() + if err == nil && len(clients) > 0 { + var existingProcs []string + for _, c := range clients { + if c.Online { + existingProcs = append(existingProcs, string(c.Proc.ProcId)) + } + } + if diff := arrayContains(existingProcs, dependProcs); diff == "" { + break loop + } else { + logx.Errorf("Proc: %s is not running!", diff) + time.Sleep(time.Second * 1) + } + } else { + tryTimes++ + time.Sleep(time.Second * 5) + } + } else { + logx.Errorf("tried 15 times, client.GetRegisteredClient failed") + initChan <- false + return + } + } + } + go client.StartServer(nil) ss.bhClient = client @@ -112,9 +166,9 @@ // 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬� ss.QueryClusterStat() - if ss.ClusterStatus != "" { - ss.sqlDB.LogMode(true) - } + //if ss.ClusterStatus != "" { + //ss.sqlDB.LogMode(true) + //} initChan <- true <-q @@ -126,13 +180,20 @@ os.Exit(0) } -func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { +func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error { + sqlMsg := SqlMsg{ + Id: uuid.NewV4().String(), + Sql: sql, + } + + bMsg, _ := json.Marshal(sqlMsg) + var msg = ProcMessageEvent{ Owner: ss.ServerId, Target: targetId, Proc: ss.ProcName, Topic: ss.syncSqlTopic, - Payload: payload, + Payload: bMsg, } b, err := json.Marshal(msg) @@ -157,7 +218,7 @@ return err } - fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId) + logx.Debugf("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:%s", ss.ServerId) return ss.bhClient.Publish(serfSyncTopic, b) } @@ -177,7 +238,7 @@ // 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹� if string(busMsg.Topic) == ss.queryTableTopic { if ss.ClusterStatus == "master" { - fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�") + logx.Debugf("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�.") ss.handleSyncTableMessage(busMsg.Data) } } @@ -199,20 +260,28 @@ // 鍒涘缓闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊master ss.clusterEventFn(EventCreateCluster) ss.ClusterStatus = "master" - ss.sqlDB.LogMode(true) + //ss.sqlDB.LogMode(true) case "join": // 鍔犲叆闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave ss.clusterEventFn(EventJoinCluster) ss.onJoinCluster() ss.ClusterStatus = "slave" - ss.sqlDB.LogMode(true) + //ss.sqlDB.LogMode(true) case "leave": // 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave ss.clusterEventFn(EventLeaveCluster) ss.ClusterStatus = "" - ss.sqlDB.LogMode(false) + //ss.sqlDB.LogMode(true) + case "slave2master": + ss.clusterEventFn(EventSlave2Master) + ss.ClusterStatus = "master" + //ss.sqlDB.LogMode(true) + case "master2slave": + ss.clusterEventFn(EventMaster2Slave) + ss.ClusterStatus = "slave" + //ss.sqlDB.LogMode(true) } } } @@ -240,7 +309,7 @@ err = tx.Exec(delSql).Error if err != nil { - fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error()) + logx.Errorf("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触, %s", err.Error()) } } @@ -261,7 +330,7 @@ } // 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave -func (ss *SyncServer) QueryClusterStat() string { +func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply { clusterStatTopic := "/data/api-v/cluster/status" req := bhomeclient.Request{ Path: clusterStatTopic, @@ -272,14 +341,14 @@ if err != nil { fmt.Println("RequestTopic error", err.Error()) - return "" + return reply } ss.ClusterStatus = reply.Msg - fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus) + logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus) - return reply.Msg + return reply } func (ss *SyncServer) handleDbLoggerPrint() { @@ -293,7 +362,7 @@ syncSql := strings.Join(sqlBuf, "") //fmt.Println("鍚屾sql璇彞:", syncSql) - ss.pubSyncSqlMessage([]byte(syncSql), "") + ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) sendSize = 0 @@ -304,7 +373,7 @@ syncSql := strings.Join(sqlBuf, "") //fmt.Println("鍚屾sql璇彞:", syncSql) - ss.pubSyncSqlMessage([]byte(syncSql), "") + ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) } @@ -322,9 +391,25 @@ } } -func (ss *SyncServer) handleClusterMessage(msg []byte) { - //fmt.Println("clusterMessage:", string(msg)) - sql := string(msg) +func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) { + var msg SqlMsg + err := json.Unmarshal(clusterMsgData,&msg) + if err != nil { + logx.Errorf(" Unmarshal cluster message error, %s",err.Error()) + return + } + + // 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩� + if sqlMsgSeqCache.Exists(msg.Id) { + logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql) + return + } + + // 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈� + sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true) + + logx.Infof("clusterMessage:%s", msg.Sql) + sql := msg.Sql if len(sql) <= 0 { return @@ -354,17 +439,143 @@ } } +// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte func (ss *SyncServer) handleSyncTableMessage(msg []byte) error { + sizeLimit := 61440 targetId := string(msg) - fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId) + + //fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId) sqls, err := DumpTables(ss.sqlDB, ss.syncTables) if err != nil { - fmt.Println("DumpTables error, ", err.Error()) + logx.Errorf("DumpTables error: %s", err.Error()) return err } + logx.Infof("DumpTables sql:%v", sqls) syncSql := strings.Join(sqls, ";") - err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + if len(syncSql) < sizeLimit { + err = ss.pubSyncSqlMessage(syncSql, targetId) + } else { + shard := "" + for _, sql := range sqls { + if len(shard)+len(sql) > sizeLimit { + err = ss.pubSyncSqlMessage(shard, targetId) + shard = "" + } + + shard = fmt.Sprintf("%s%s;", shard, sql) + } + + if len(shard) > 0 { + err = ss.pubSyncSqlMessage(shard, targetId) + } + } return err } + +func (ss *SyncServer) Print(values ...interface{}) { + var ( + level = values[0] + ) + + //fmt.Println("dblogger", values) + + if level == "sql" { + msgArr := gorm.LogFormatter(values...) + sql := msgArr[3].(string) + logx.Infof("sql: %v", sql) + sql = strings.TrimPrefix(sql, " ") + if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") { + affected := values[5].(int64) + if affected > 0 { //鎵ц鎴愬姛 + //鍒ゆ柇鎿嶄綔鐨勬槸鍝紶琛� + whereIdx := strings.Index(sql, "WHERE") + sqlWithTable := sql + if whereIdx > -1 { + sqlWithTable = sql[:whereIdx] + } + + //fmt.Println("鍒ゆ柇鏄摢寮犺〃 sqlWithTable:", sqlWithTable) + + insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert + updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update + delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete + + if insertReg.MatchString(sqlWithTable) { + //fmt.Println("鎻掑叆鎿嶄綔") + for _, t := range agent.syncTables { + reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) + if reg.MatchString(sqlWithTable) { + //fmt.Println("灞炰簬鍚屾琛�:", t) + // 鍒ゆ柇鏄湪闆嗙兢鍐�, 鍚屾娑堟伅, 鍒ゆ柇涓ょ瑙掕壊, 涓洪伩鍏嶅叾浠栧嚭鐜扮姸鎬� + if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { + syncSqlChan <- sql + } + } + } + } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) { + //fmt.Println("鍒犻櫎鎴栬�呮洿鏂�") + for _, t := range agent.syncTables { + reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`) + if reg.MatchString(sqlWithTable) { + //fmt.Println("灞炰簬鍚屾琛�:", t) + if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" { + syncSqlChan <- sql + } + } + } + } + } + } + } else { + fmt.Println("dbLogger level!=sql") + } +} + +func arrayContains(list []string, arr []string) string { + if arr == nil || list == nil { + return "" + } + + for _, s := range arr { + isExist := false + for _, t := range list { + if s == t { + isExist = true + break + } + } + + if !isExist { + return s + } + } + + return "" +} + +type NodeInfo struct { + NodeID string `json:"node_id,omitempty"` + NodeIp string `json:"node_ip,omitempty"` + NodeName string `json:"node_name,omitempty"` + ClusterID string `json:"cluster_id"` + CreateTime string `json:"create_time"` + DeviceType string `json:"device_type"` + DriftState string `json:"drift_state"` + Online bool `json:"online"` +} + +func QueryClusterStatusAndNodeQuantity() (string, int) { + reply := agent.QueryClusterStat() + if reply == nil { + return "", 0 + } + var nodes []NodeInfo + err := mapstructure.Decode(reply.Data, &nodes) + if err != nil { + logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err) + return reply.Msg, 0 + } + return reply.Msg, len(nodes) +} -- Gitblit v1.8.0