From 8324f872ef3a4d0c978a9b1d062800c6a1701c12 Mon Sep 17 00:00:00 2001 From: zhangqian <zhangqian@123.com> Date: 星期五, 01 十二月 2023 09:58:17 +0800 Subject: [PATCH] fix --- serf/sync.go | 90 +++++++++++++++++++++++++++++++++++++------- 1 files changed, 75 insertions(+), 15 deletions(-) diff --git a/serf/sync.go b/serf/sync.go index d8ef4c7..30be0ee 100644 --- a/serf/sync.go +++ b/serf/sync.go @@ -1,7 +1,6 @@ package serf import ( - "apsClient/pkg/logx" "context" "encoding/json" "fmt" @@ -12,12 +11,17 @@ "syscall" "time" + "apsClient/pkg/logx" + "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/bhomedbapi.git" "github.com/gogo/protobuf/proto" "github.com/jinzhu/gorm" + "github.com/satori/go.uuid" + "github.com/mitchellh/mapstructure" + "github.com/muesli/cache2go" ) var ( @@ -25,6 +29,8 @@ dependProcs = []string{ bhomeclient.Proc_System_Service, } + + sqlMsgSeqCache = cache2go.Cache("syncSqlMsg") ) const ( @@ -43,6 +49,12 @@ Proc string `json:"procName"` // 杩涚▼鍚� Topic string `json:"topic"` // 涓婚 Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 +} + +type SqlMsg struct { + Id string + Sql string + Version string } type SyncServer struct { @@ -168,13 +180,20 @@ os.Exit(0) } -func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { +func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error { + sqlMsg := SqlMsg{ + Id: uuid.NewV4().String(), + Sql: sql, + } + + bMsg, _ := json.Marshal(sqlMsg) + var msg = ProcMessageEvent{ Owner: ss.ServerId, Target: targetId, Proc: ss.ProcName, Topic: ss.syncSqlTopic, - Payload: payload, + Payload: bMsg, } b, err := json.Marshal(msg) @@ -311,7 +330,7 @@ } // 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave -func (ss *SyncServer) QueryClusterStat() string { +func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply { clusterStatTopic := "/data/api-v/cluster/status" req := bhomeclient.Request{ Path: clusterStatTopic, @@ -322,14 +341,14 @@ if err != nil { fmt.Println("RequestTopic error", err.Error()) - return "" + return reply } ss.ClusterStatus = reply.Msg logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus) - return reply.Msg + return reply } func (ss *SyncServer) handleDbLoggerPrint() { @@ -343,7 +362,7 @@ syncSql := strings.Join(sqlBuf, "") //fmt.Println("鍚屾sql璇彞:", syncSql) - ss.pubSyncSqlMessage([]byte(syncSql), "") + ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) sendSize = 0 @@ -354,7 +373,7 @@ syncSql := strings.Join(sqlBuf, "") //fmt.Println("鍚屾sql璇彞:", syncSql) - ss.pubSyncSqlMessage([]byte(syncSql), "") + ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) } @@ -372,9 +391,25 @@ } } -func (ss *SyncServer) handleClusterMessage(msg []byte) { - logx.Infof("clusterMessage:", string(msg)) - sql := string(msg) +func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) { + var msg SqlMsg + err := json.Unmarshal(clusterMsgData,&msg) + if err != nil { + logx.Errorf(" Unmarshal cluster message error, %s",err.Error()) + return + } + + // 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩� + if sqlMsgSeqCache.Exists(msg.Id) { + logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql) + return + } + + // 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈� + sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true) + + logx.Infof("clusterMessage:%s", msg.Sql) + sql := msg.Sql if len(sql) <= 0 { return @@ -419,12 +454,12 @@ logx.Infof("DumpTables sql:%v", sqls) syncSql := strings.Join(sqls, ";") if len(syncSql) < sizeLimit { - err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + err = ss.pubSyncSqlMessage(syncSql, targetId) } else { shard := "" for _, sql := range sqls { if len(shard)+len(sql) > sizeLimit { - err = ss.pubSyncSqlMessage([]byte(shard), targetId) + err = ss.pubSyncSqlMessage(shard, targetId) shard = "" } @@ -432,14 +467,14 @@ } if len(shard) > 0 { - err = ss.pubSyncSqlMessage([]byte(shard), targetId) + err = ss.pubSyncSqlMessage(shard, targetId) } } return err } -func (ss *SyncServer)Print(values ...interface{}) { +func (ss *SyncServer) Print(values ...interface{}) { var ( level = values[0] ) @@ -519,3 +554,28 @@ return "" } + +type NodeInfo struct { + NodeID string `json:"node_id,omitempty"` + NodeIp string `json:"node_ip,omitempty"` + NodeName string `json:"node_name,omitempty"` + ClusterID string `json:"cluster_id"` + CreateTime string `json:"create_time"` + DeviceType string `json:"device_type"` + DriftState string `json:"drift_state"` + Online bool `json:"online"` +} + +func QueryClusterStatusAndNodeQuantity() (string, int) { + reply := agent.QueryClusterStat() + if reply == nil { + return "", 0 + } + var nodes []NodeInfo + err := mapstructure.Decode(reply.Data, &nodes) + if err != nil { + logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err) + return reply.Msg, 0 + } + return reply.Msg, len(nodes) +} -- Gitblit v1.8.0