From 80c3bf9f605eddfab91bb72aee99172b0f09fb5a Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期二, 14 十一月 2023 15:10:57 +0800
Subject: [PATCH] fix
---
serf/sync.go | 263 +++++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 237 insertions(+), 26 deletions(-)
diff --git a/serf/sync.go b/serf/sync.go
index b49eebf..30be0ee 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -6,9 +6,12 @@
"fmt"
"os"
"os/signal"
+ "regexp"
"strings"
"syscall"
"time"
+
+ "apsClient/pkg/logx"
"basic.com/pubsub/protomsg.git"
"basic.com/valib/bhomeclient.git"
@@ -16,10 +19,18 @@
"github.com/gogo/protobuf/proto"
"github.com/jinzhu/gorm"
+ "github.com/satori/go.uuid"
+ "github.com/mitchellh/mapstructure"
+ "github.com/muesli/cache2go"
)
var (
- agent = SyncServer{}
+ agent = SyncServer{}
+ dependProcs = []string{
+ bhomeclient.Proc_System_Service,
+ }
+
+ sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
)
const (
@@ -38,6 +49,12 @@
Proc string `json:"procName"` // 杩涚▼鍚�
Topic string `json:"topic"` // 涓婚
Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SqlMsg struct {
+ Id string
+ Sql string
+ Version string
}
type SyncServer struct {
@@ -61,9 +78,10 @@
agent.queryTableTopic = procName + "/serf/query/sqls"
// 璁剧疆鏃ュ織鍥炶皟
- db.SetLogger(&DbLogger{})
+ db.SetLogger(&agent)
+
// 鍏堝叧闂棩蹇�
- db.LogMode(false)
+ //db.LogMode(false)
return &agent
}
@@ -101,6 +119,42 @@
bhomedbapi.InitDoReq(client.RequestOnly)
//bhomedbapi.InitLog(logger.Debug)
+ // 闇�瑕佺瓑寰卻ystem-service杩涚▼鎴愬姛鍚姩鍚庯紝鎵嶈兘鑾峰彇闆嗙兢鐘舵��(鎴栬�呬繚璇佺▼搴忓惎鍔ㄦ椂鑾峰彇鍒版纭殑鐘舵��)
+ tryTimes := 0
+loop:
+ for {
+ select {
+ case <-q:
+ initChan <- false
+ return
+ default:
+ if tryTimes < 15 {
+ clients, err := client.GetRegisteredClient()
+ if err == nil && len(clients) > 0 {
+ var existingProcs []string
+ for _, c := range clients {
+ if c.Online {
+ existingProcs = append(existingProcs, string(c.Proc.ProcId))
+ }
+ }
+ if diff := arrayContains(existingProcs, dependProcs); diff == "" {
+ break loop
+ } else {
+ logx.Errorf("Proc: %s is not running!", diff)
+ time.Sleep(time.Second * 1)
+ }
+ } else {
+ tryTimes++
+ time.Sleep(time.Second * 5)
+ }
+ } else {
+ logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
+ initChan <- false
+ return
+ }
+ }
+ }
+
go client.StartServer(nil)
ss.bhClient = client
@@ -112,9 +166,9 @@
// 鍚姩鍚庢煡璇竴娆¢泦缇ょ姸鎬�
ss.QueryClusterStat()
- if ss.ClusterStatus != "" {
- ss.sqlDB.LogMode(true)
- }
+ //if ss.ClusterStatus != "" {
+ //ss.sqlDB.LogMode(true)
+ //}
initChan <- true
<-q
@@ -126,13 +180,20 @@
os.Exit(0)
}
-func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
+ sqlMsg := SqlMsg{
+ Id: uuid.NewV4().String(),
+ Sql: sql,
+ }
+
+ bMsg, _ := json.Marshal(sqlMsg)
+
var msg = ProcMessageEvent{
Owner: ss.ServerId,
Target: targetId,
Proc: ss.ProcName,
Topic: ss.syncSqlTopic,
- Payload: payload,
+ Payload: bMsg,
}
b, err := json.Marshal(msg)
@@ -157,7 +218,7 @@
return err
}
- fmt.Println("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:", ss.ServerId)
+ logx.Debugf("鍔犲叆闆嗙兢, 璇锋眰鍚屾鍏ㄩ噺鏁版嵁,id:%s", ss.ServerId)
return ss.bhClient.Publish(serfSyncTopic, b)
}
@@ -177,7 +238,7 @@
// 澶勭悊鍚屾鍏ㄩ噺鏁版嵁鐨勮姹�
if string(busMsg.Topic) == ss.queryTableTopic {
if ss.ClusterStatus == "master" {
- fmt.Println("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�")
+ logx.Debugf("鎺ユ敹鍒板悓姝ュ叏閲忔暟鎹姹�.")
ss.handleSyncTableMessage(busMsg.Data)
}
}
@@ -199,20 +260,28 @@
// 鍒涘缓闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊master
ss.clusterEventFn(EventCreateCluster)
ss.ClusterStatus = "master"
- ss.sqlDB.LogMode(true)
+ //ss.sqlDB.LogMode(true)
case "join":
// 鍔犲叆闆嗙兢, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
ss.clusterEventFn(EventJoinCluster)
ss.onJoinCluster()
ss.ClusterStatus = "slave"
- ss.sqlDB.LogMode(true)
+ //ss.sqlDB.LogMode(true)
case "leave":
// 閫�鍑洪泦缇�, 寮�鍚棩蹇楄窡韪�, 璁剧疆瑙掕壊slave
ss.clusterEventFn(EventLeaveCluster)
ss.ClusterStatus = ""
- ss.sqlDB.LogMode(false)
+ //ss.sqlDB.LogMode(true)
+ case "slave2master":
+ ss.clusterEventFn(EventSlave2Master)
+ ss.ClusterStatus = "master"
+ //ss.sqlDB.LogMode(true)
+ case "master2slave":
+ ss.clusterEventFn(EventMaster2Slave)
+ ss.ClusterStatus = "slave"
+ //ss.sqlDB.LogMode(true)
}
}
}
@@ -240,7 +309,7 @@
err = tx.Exec(delSql).Error
if err != nil {
- fmt.Println("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+ logx.Errorf("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触, %s", err.Error())
}
}
@@ -261,7 +330,7 @@
}
// 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave
-func (ss *SyncServer) QueryClusterStat() string {
+func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply {
clusterStatTopic := "/data/api-v/cluster/status"
req := bhomeclient.Request{
Path: clusterStatTopic,
@@ -272,14 +341,14 @@
if err != nil {
fmt.Println("RequestTopic error", err.Error())
- return ""
+ return reply
}
ss.ClusterStatus = reply.Msg
- fmt.Println("褰撳墠闆嗙兢鐘舵��:", ss.ClusterStatus)
+ logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus)
- return reply.Msg
+ return reply
}
func (ss *SyncServer) handleDbLoggerPrint() {
@@ -293,7 +362,7 @@
syncSql := strings.Join(sqlBuf, "")
//fmt.Println("鍚屾sql璇彞:", syncSql)
- ss.pubSyncSqlMessage([]byte(syncSql), "")
+ ss.pubSyncSqlMessage(syncSql, "")
sqlBuf = append([]string{})
sendSize = 0
@@ -304,7 +373,7 @@
syncSql := strings.Join(sqlBuf, "")
//fmt.Println("鍚屾sql璇彞:", syncSql)
- ss.pubSyncSqlMessage([]byte(syncSql), "")
+ ss.pubSyncSqlMessage(syncSql, "")
sqlBuf = append([]string{})
}
@@ -322,9 +391,25 @@
}
}
-func (ss *SyncServer) handleClusterMessage(msg []byte) {
- //fmt.Println("clusterMessage:", string(msg))
- sql := string(msg)
+func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
+ var msg SqlMsg
+ err := json.Unmarshal(clusterMsgData,&msg)
+ if err != nil {
+ logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
+ return
+ }
+
+ // 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩�
+ if sqlMsgSeqCache.Exists(msg.Id) {
+ logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql)
+ return
+ }
+
+ // 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈�
+ sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
+
+ logx.Infof("clusterMessage:%s", msg.Sql)
+ sql := msg.Sql
if len(sql) <= 0 {
return
@@ -354,17 +439,143 @@
}
}
+// serf 鍚屾鏁版嵁鐨勯檺鍒朵负92160 byte
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
+ sizeLimit := 61440
targetId := string(msg)
- fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
+
+ //fmt.Println("鍚屾鍏ㄩ噺鏁版嵁缁欒妭鐐�:", targetId)
sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
if err != nil {
- fmt.Println("DumpTables error, ", err.Error())
+ logx.Errorf("DumpTables error: %s", err.Error())
return err
}
+ logx.Infof("DumpTables sql:%v", sqls)
syncSql := strings.Join(sqls, ";")
- err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+ if len(syncSql) < sizeLimit {
+ err = ss.pubSyncSqlMessage(syncSql, targetId)
+ } else {
+ shard := ""
+ for _, sql := range sqls {
+ if len(shard)+len(sql) > sizeLimit {
+ err = ss.pubSyncSqlMessage(shard, targetId)
+ shard = ""
+ }
+
+ shard = fmt.Sprintf("%s%s;", shard, sql)
+ }
+
+ if len(shard) > 0 {
+ err = ss.pubSyncSqlMessage(shard, targetId)
+ }
+ }
return err
}
+
+func (ss *SyncServer) Print(values ...interface{}) {
+ var (
+ level = values[0]
+ )
+
+ //fmt.Println("dblogger", values)
+
+ if level == "sql" {
+ msgArr := gorm.LogFormatter(values...)
+ sql := msgArr[3].(string)
+ logx.Infof("sql: %v", sql)
+ sql = strings.TrimPrefix(sql, " ")
+ if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
+ affected := values[5].(int64)
+ if affected > 0 { //鎵ц鎴愬姛
+ //鍒ゆ柇鎿嶄綔鐨勬槸鍝紶琛�
+ whereIdx := strings.Index(sql, "WHERE")
+ sqlWithTable := sql
+ if whereIdx > -1 {
+ sqlWithTable = sql[:whereIdx]
+ }
+
+ //fmt.Println("鍒ゆ柇鏄摢寮犺〃 sqlWithTable:", sqlWithTable)
+
+ insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
+ updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
+ delReg := regexp.MustCompile(`^\s*(?i:delete)\s`) //delete
+
+ if insertReg.MatchString(sqlWithTable) {
+ //fmt.Println("鎻掑叆鎿嶄綔")
+ for _, t := range agent.syncTables {
+ reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+ if reg.MatchString(sqlWithTable) {
+ //fmt.Println("灞炰簬鍚屾琛�:", t)
+ // 鍒ゆ柇鏄湪闆嗙兢鍐�, 鍚屾娑堟伅, 鍒ゆ柇涓ょ瑙掕壊, 涓洪伩鍏嶅叾浠栧嚭鐜扮姸鎬�
+ if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
+ syncSqlChan <- sql
+ }
+ }
+ }
+ } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
+ //fmt.Println("鍒犻櫎鎴栬�呮洿鏂�")
+ for _, t := range agent.syncTables {
+ reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
+ if reg.MatchString(sqlWithTable) {
+ //fmt.Println("灞炰簬鍚屾琛�:", t)
+ if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
+ syncSqlChan <- sql
+ }
+ }
+ }
+ }
+ }
+ }
+ } else {
+ fmt.Println("dbLogger level!=sql")
+ }
+}
+
+func arrayContains(list []string, arr []string) string {
+ if arr == nil || list == nil {
+ return ""
+ }
+
+ for _, s := range arr {
+ isExist := false
+ for _, t := range list {
+ if s == t {
+ isExist = true
+ break
+ }
+ }
+
+ if !isExist {
+ return s
+ }
+ }
+
+ return ""
+}
+
+type NodeInfo struct {
+ NodeID string `json:"node_id,omitempty"`
+ NodeIp string `json:"node_ip,omitempty"`
+ NodeName string `json:"node_name,omitempty"`
+ ClusterID string `json:"cluster_id"`
+ CreateTime string `json:"create_time"`
+ DeviceType string `json:"device_type"`
+ DriftState string `json:"drift_state"`
+ Online bool `json:"online"`
+}
+
+func QueryClusterStatusAndNodeQuantity() (string, int) {
+ reply := agent.QueryClusterStat()
+ if reply == nil {
+ return "", 0
+ }
+ var nodes []NodeInfo
+ err := mapstructure.Decode(reply.Data, &nodes)
+ if err != nil {
+ logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err)
+ return reply.Msg, 0
+ }
+ return reply.Msg, len(nodes)
+}
--
Gitblit v1.8.0