From 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期日, 08 十月 2023 11:24:37 +0800
Subject: [PATCH] 修复集群同步功能

---
 system-service/serf/serf.go |   23 +++++++++++++++++++----
 1 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go
index 55ecb72..0a14a19 100644
--- a/system-service/serf/serf.go
+++ b/system-service/serf/serf.go
@@ -23,11 +23,15 @@
 	UserEventSyncVirtualIp          = "SyncVirtualIp"              //婕傜Щip淇敼
 	UserEventSyncRegisterInfo       = "SyncRegisterInfo"           //鍚屾娉ㄥ唽淇℃伅
 	DataSystemSerfSubscribe         = "data-system-serf-subscribe" //鍚刟pp浠巗erf璁㈤槄娑堟伅
+	UserEventSyncMessage            = "SyncMessageForProc"         // 涓哄叾浠栬繘绋嬪悓姝ユ秷鎭�
 	TcpTransportPort                = 30194                        //tcp浼犺緭澶ф暟鎹噺鎺ュ彛
+
+	SUserEventSyncMessage
 )
 
 var SyncDbTablePersonCacheChan = make(chan []byte, 512)
 var SyncVirtualIpChan = make(chan []byte, 512)
+var SyncProcMessageChan = make(chan []byte, 512)
 
 func HandleSerfEvent(event serf.Event) {
 	switch ev := event.(type) {
@@ -42,6 +46,9 @@
 			HandleSyncRegisterInfo(ev)
 		} else if ev.Name == DataSystemSerfSubscribe {
 			HandleDataSystemSerfSub(ev)
+		} else if ev.Name == UserEventSyncMessage {
+			logger.Debug("鎺ユ敹鍒癝yncMessageForProc")
+			HandleUserEventSyncMessage(ev)
 		}
 	case *serf.Query:
 		if ev.Name == QueryEventUpdateDBData {
@@ -100,6 +107,14 @@
 type SqlUserEvent struct {
 	Owner string   `json:"owner"`
 	Sql   []string `json:"sql"`
+}
+
+type ProcMessageEvent struct {
+	Owner   string `json:"owner"`    // 鍙戦�佽��
+	Target  string `json:"target"`   // 鎸囧畾鎺ユ敹鑰�
+	Proc    string `json:"procName"` // 杩涚▼鍚�
+	Topic   string `json:"topic"`    // 涓婚
+	Payload []byte `json:"payload"`  // 娑堟伅浣�,鑷瑙f瀽
 }
 
 type TableDesc struct {
@@ -193,7 +208,7 @@
 	mbs := a.GroupMembers(clusterId)
 	var specmembername string
 	for _, m := range mbs {
-		logger.Info("m", m)
+		logger.Info("member", m)
 		if m.Name != config.Server.AnalyServerId { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad
 			if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") {
 				if strings.HasPrefix(m.Name, "DSVAD") {
@@ -206,7 +221,7 @@
 			}
 		}
 	}
-	logger.Info("mbs:", mbs, "specmembername:", specmembername)
+	logger.Info("members:", mbs, "specmembername:", specmembername)
 	if specmembername == "" { //濡傛灉鏈壘鍒扮洰鏍囪妭鐐癸紝璇存槑褰撳墠闆嗙兢鍐呴櫎浜嗘湰鑺傜偣锛屾病鏈夊叾浠栧彲鐢ㄨ妭鐐�
 		return nil, errors.New("specmembername not found")
 	}
@@ -245,8 +260,8 @@
 				logger.Info("Query response's len:", len(msg))
 				err := json.Unmarshal(msg, &dumpSqls)
 				if err == nil {
-					logger.Error("dumpSql:", dumpSqls)
-					logger.Error("data dump success")
+					//logger.Error("dumpSql:", dumpSqls)
+					logger.Debug("data dump success")
 				}
 				return
 			}

--
Gitblit v1.8.0