From 8324f872ef3a4d0c978a9b1d062800c6a1701c12 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 01 十二月 2023 09:58:17 +0800
Subject: [PATCH] fix

---
 serf/sync.go |  187 +++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 164 insertions(+), 23 deletions(-)

diff --git a/serf/sync.go b/serf/sync.go
index c6337cf..30be0ee 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -1,15 +1,17 @@
 package serf
 
 import (
-	"apsClient/pkg/logx"
 	"context"
 	"encoding/json"
 	"fmt"
 	"os"
 	"os/signal"
+	"regexp"
 	"strings"
 	"syscall"
 	"time"
+
+	"apsClient/pkg/logx"
 
 	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/bhomeclient.git"
@@ -17,6 +19,9 @@
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/jinzhu/gorm"
+	"github.com/satori/go.uuid"
+	"github.com/mitchellh/mapstructure"
+	"github.com/muesli/cache2go"
 )
 
 var (
@@ -24,6 +29,8 @@
 	dependProcs = []string{
 		bhomeclient.Proc_System_Service,
 	}
+
+	sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
 )
 
 const (
@@ -42,6 +49,12 @@
 	Proc    string `json:"procName"` // 杩涚▼鍚�
 	Topic   string `json:"topic"`    // 涓婚
 	Payload []byte `json:"payload"`  // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SqlMsg struct {
+	Id      string
+	Sql     string
+	Version string
 }
 
 type SyncServer struct {
@@ -65,9 +78,10 @@
 	agent.queryTableTopic = procName + "/serf/query/sqls"
 
 	// 璁剧疆鏃ュ織鍥炶皟
-	db.SetLogger(&DbLogger{})
+	db.SetLogger(&agent)
+
 	// 鍏堝叧闂棩蹇�
-	db.LogMode(false)
+	//db.LogMode(false)
 
 	return &agent
 }
@@ -152,9 +166,9 @@
 	// 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬�
 	ss.QueryClusterStat()
 
-	if ss.ClusterStatus != "" {
-		ss.sqlDB.LogMode(true)
-	}
+	//if ss.ClusterStatus != "" {
+	//ss.sqlDB.LogMode(true)
+	//}
 
 	initChan <- true
 	<-q
@@ -166,13 +180,20 @@
 	os.Exit(0)
 }
 
-func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
+	sqlMsg := SqlMsg{
+		Id:  uuid.NewV4().String(),
+		Sql: sql,
+	}
+
+	bMsg, _ := json.Marshal(sqlMsg)
+
 	var msg = ProcMessageEvent{
 		Owner:   ss.ServerId,
 		Target:  targetId,
 		Proc:    ss.ProcName,
 		Topic:   ss.syncSqlTopic,
-		Payload: payload,
+		Payload: bMsg,
 	}
 
 	b, err := json.Marshal(msg)
@@ -239,28 +260,28 @@
 						// 鍒涘缓闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊master
 						ss.clusterEventFn(EventCreateCluster)
 						ss.ClusterStatus = "master"
-						ss.sqlDB.LogMode(true)
+						//ss.sqlDB.LogMode(true)
 
 					case "join":
 						// 鍔犲叆闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
 						ss.clusterEventFn(EventJoinCluster)
 						ss.onJoinCluster()
 						ss.ClusterStatus = "slave"
-						ss.sqlDB.LogMode(true)
+						//ss.sqlDB.LogMode(true)
 
 					case "leave":
 						// 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
 						ss.clusterEventFn(EventLeaveCluster)
 						ss.ClusterStatus = ""
-						ss.sqlDB.LogMode(false)
+						//ss.sqlDB.LogMode(true)
 					case "slave2master":
 						ss.clusterEventFn(EventSlave2Master)
 						ss.ClusterStatus = "master"
-						ss.sqlDB.LogMode(true)
+						//ss.sqlDB.LogMode(true)
 					case "master2slave":
 						ss.clusterEventFn(EventMaster2Slave)
 						ss.ClusterStatus = "slave"
-						ss.sqlDB.LogMode(true)
+						//ss.sqlDB.LogMode(true)
 					}
 				}
 			}
@@ -309,7 +330,7 @@
 }
 
 // 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave
-func (ss *SyncServer) QueryClusterStat() string {
+func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply {
 	clusterStatTopic := "/data/api-v/cluster/status"
 	req := bhomeclient.Request{
 		Path:   clusterStatTopic,
@@ -320,14 +341,14 @@
 	if err != nil {
 		fmt.Println("RequestTopic error", err.Error())
 
-		return ""
+		return reply
 	}
 
 	ss.ClusterStatus = reply.Msg
 
 	logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus)
 
-	return reply.Msg
+	return reply
 }
 
 func (ss *SyncServer) handleDbLoggerPrint() {
@@ -341,7 +362,7 @@
 				syncSql := strings.Join(sqlBuf, "")
 
 				//fmt.Println("鍚屾sql璇彞:", syncSql)
-				ss.pubSyncSqlMessage([]byte(syncSql), "")
+				ss.pubSyncSqlMessage(syncSql, "")
 
 				sqlBuf = append([]string{})
 				sendSize = 0
@@ -352,7 +373,7 @@
 					syncSql := strings.Join(sqlBuf, "")
 					//fmt.Println("鍚屾sql璇彞:", syncSql)
 
-					ss.pubSyncSqlMessage([]byte(syncSql), "")
+					ss.pubSyncSqlMessage(syncSql, "")
 
 					sqlBuf = append([]string{})
 				}
@@ -370,9 +391,25 @@
 	}
 }
 
-func (ss *SyncServer) handleClusterMessage(msg []byte) {
-	//fmt.Println("clusterMessage:", string(msg))
-	sql := string(msg)
+func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
+	var msg SqlMsg
+	err := json.Unmarshal(clusterMsgData,&msg)
+	if err != nil {
+		logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
+		return
+	}
+
+	// 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩�
+	if sqlMsgSeqCache.Exists(msg.Id) {
+		logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql)
+		return
+	}
+
+	// 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈�
+	sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
+
+	logx.Infof("clusterMessage:%s", msg.Sql)
+	sql := msg.Sql
 
 	if len(sql) <= 0 {
 		return
@@ -402,19 +439,98 @@
 	}
 }
 
+// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte
 func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
+	sizeLimit := 61440
 	targetId := string(msg)
+
 	//fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
 	sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
 	if err != nil {
-		fmt.Println("DumpTables error, ", err.Error())
+		logx.Errorf("DumpTables error: %s", err.Error())
 		return err
 	}
 
+	logx.Infof("DumpTables sql:%v", sqls)
 	syncSql := strings.Join(sqls, ";")
-	err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+	if len(syncSql) < sizeLimit {
+		err = ss.pubSyncSqlMessage(syncSql, targetId)
+	} else {
+		shard := ""
+		for _, sql := range sqls {
+			if len(shard)+len(sql) > sizeLimit {
+				err = ss.pubSyncSqlMessage(shard, targetId)
+				shard = ""
+			}
+
+			shard = fmt.Sprintf("%s%s;", shard, sql)
+		}
+
+		if len(shard) > 0 {
+			err = ss.pubSyncSqlMessage(shard, targetId)
+		}
+	}
 
 	return err
+}
+
+func (ss *SyncServer) Print(values ...interface{}) {
+	var (
+		level = values[0]
+	)
+
+	//fmt.Println("dblogger", values)
+
+	if level == "sql" {
+		msgArr := gorm.LogFormatter(values...)
+		sql := msgArr[3].(string)
+		logx.Infof("sql: %v", sql)
+		sql = strings.TrimPrefix(sql, " ")
+		if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
+			affected := values[5].(int64)
+			if affected > 0 { //鎵ц鎴愬姛
+				//鍒ゆ柇鎿嶄綔鐨勬槸鍝紶琛�
+				whereIdx := strings.Index(sql, "WHERE")
+				sqlWithTable := sql
+				if whereIdx > -1 {
+					sqlWithTable = sql[:whereIdx]
+				}
+
+				//fmt.Println("鍒ゆ柇鏄摢寮犺〃 sqlWithTable:", sqlWithTable)
+
+				insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
+				updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
+				delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)    //delete
+
+				if insertReg.MatchString(sqlWithTable) {
+					//fmt.Println("鎻掑叆鎿嶄綔")
+					for _, t := range agent.syncTables {
+						reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+						if reg.MatchString(sqlWithTable) {
+							//fmt.Println("灞炰簬鍚屾琛�:", t)
+							// 鍒ゆ柇鏄湪闆嗙兢鍐�, 鍚屾娑堟伅, 鍒ゆ柇涓ょ瑙掕壊, 涓洪伩鍏嶅叾浠栧嚭鐜扮姸鎬�
+							if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
+								syncSqlChan <- sql
+							}
+						}
+					}
+				} else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
+					//fmt.Println("鍒犻櫎鎴栬�呮洿鏂�")
+					for _, t := range agent.syncTables {
+						reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+						if reg.MatchString(sqlWithTable) {
+							//fmt.Println("灞炰簬鍚屾琛�:", t)
+							if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
+								syncSqlChan <- sql
+							}
+						}
+					}
+				}
+			}
+		}
+	} else {
+		fmt.Println("dbLogger level!=sql")
+	}
 }
 
 func arrayContains(list []string, arr []string) string {
@@ -438,3 +554,28 @@
 
 	return ""
 }
+
+type NodeInfo struct {
+	NodeID     string `json:"node_id,omitempty"`
+	NodeIp     string `json:"node_ip,omitempty"`
+	NodeName   string `json:"node_name,omitempty"`
+	ClusterID  string `json:"cluster_id"`
+	CreateTime string `json:"create_time"`
+	DeviceType string `json:"device_type"`
+	DriftState string `json:"drift_state"`
+	Online     bool   `json:"online"`
+}
+
+func QueryClusterStatusAndNodeQuantity() (string, int) {
+	reply := agent.QueryClusterStat()
+	if reply == nil {
+		return "", 0
+	}
+	var nodes []NodeInfo
+	err := mapstructure.Decode(reply.Data, &nodes)
+	if err != nil {
+		logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err)
+		return reply.Msg, 0
+	}
+	return reply.Msg, len(nodes)
+}

--
Gitblit v1.8.0