From 63645d248c765244488cd34dbc1bb6528ca6b7c7 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 05 九月 2023 09:58:13 +0800
Subject: [PATCH] 修复编译

---
 system-service/serf/handler.go |  528 +++++++++++++++++++++++++++++-----------------------------
 1 files changed, 264 insertions(+), 264 deletions(-)

diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index a6fc49b..6c72b22 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -1,264 +1,264 @@
-package serf
-
-import (
-	"basic.com/valib/logger.git"
-	"basic.com/valib/serf.git/serf"
-	"encoding/json"
-	"github.com/golang/protobuf/proto"
-	"github.com/hashicorp/memberlist"
-	"github.com/satori/go.uuid"
-	"path/filepath"
-	"reflect"
-	"runtime"
-	"strconv"
-	"strings"
-	"time"
-	"vamicro/config"
-	"vamicro/system-service/bhome_msg_dev"
-)
-
-type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
-
-var rpcHandlers map[string]RpcHandle
-
-func init() {
-	rpcHandlers = make(map[string]RpcHandle)
-}
-
-// RegisterRpcHandles
-func RegisterRpcHandles(fs ...RpcHandle) {
-	for _, f := range fs {
-		name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName
-		nameEnd := filepath.Ext(name)
-		name = strings.TrimPrefix(nameEnd, ".")
-		if _, ok := rpcHandlers[name]; !ok {
-			rpcHandlers[name] = f
-		}
-	}
-}
-
-/*****************************UserEvent***************************************/
-func HandleUserEventSyncSql(ev serf.UserEvent) {
-	logger.Info("receive a UserEventSyncSql event")
-	var sqlUe SqlUserEvent
-	err := json.Unmarshal(ev.Payload, &sqlUe)
-	if err != nil {
-		logger.Error("sqlUe unmarshal err:", err)
-		return
-	}
-
-	logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
-	if sqlUe.Owner != config.Server.AnalyServerId {
-		go func() {
-			flag, e := executeSqlByGorm(sqlUe.Sql)
-			logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e)
-			logLT := strconv.Itoa(int(ev.LTime))
-			logT := time.Now().Format("2006-01-02 15:04:05")
-			logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
-			logResult := "0"
-			if flag {
-				logResult = "1"
-			}
-			logErr := ""
-			if e != nil {
-				logErr = e.Error()
-			}
-			executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"})
-		}()
-	}
-}
-
-func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) {
-	logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload))
-	SyncDbTablePersonCacheChan <- ev.Payload
-}
-
-func HandleUserEventSyncVirtualIp(ev serf.UserEvent) {
-	logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
-	SyncVirtualIpChan <- ev.Payload
-}
-
-//鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑
-func HandleSyncRegisterInfo(ev serf.UserEvent) {
-	logger.Debug("HandleSyncRegisterInfo")
-	var si bhome_msg_dev.MsgDevRegisterInfo
-	if err := proto.Unmarshal(ev.Payload, &si); err == nil {
-		logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId)
-		if string(si.DevId) != config.Server.AnalyServerId {
-			compareRPool(&si)
-		}
-	} else {
-		logger.Error("HandleSyncRegisterInfo unmarshal err:", err)
-	}
-}
-
-func HandleDataSystemSerfSub(ev serf.UserEvent) {
-	h := GetBusHandle()
-	if h == nil {
-		logger.Error("HandleDataSystemSerfSub bus handle is nil")
-		return
-	}
-	err := h.Publish(DataSystemSerfSubscribe, ev.Payload)
-	if err != nil {
-		logger.Error("HandleDataSystemSerfSub pub err:", err)
-	}
-}
-
-/*****************************Query***************************************/
-func HandleQueryEventUpdateDBData(ev *serf.Query) {
-	logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId)
-	var fromP QueryTableDataParam
-	err := json.Unmarshal(ev.Payload, &fromP)
-	if err != nil {
-		logger.Error("Query tableNames unmarshal err")
-		if err := ev.Respond([]byte("request unmarshal err")); err != nil {
-			logger.Error("query.Respond err: %s\n", err)
-			return
-		}
-
-		return
-	}
-	logger.Info("Query tableNames:", fromP.Tables)
-	datas, err := DumpTables(fromP.Tables)
-	if err != nil {
-		logger.Error("queryByGorm err:", err)
-		if err := ev.Respond([]byte("queryByGorm err")); err != nil {
-			logger.Error("query.Respond err: %s\n", err)
-			return
-		}
-		return
-	}
-	bytesReturn, err := json.Marshal(datas)
-	logger.Info("results.len: ", len(bytesReturn))
-
-	var targetNode *memberlist.Node
-	nodes := Agent.Serf().Memberlist().Members()
-	if nodes != nil && len(nodes) > 0 {
-		for _, n := range nodes {
-			if n.Name == fromP.From {
-				targetNode = n
-				break
-			}
-		}
-	}
-	logger.Debug("targetNode:", targetNode.Name)
-	if targetNode != nil {
-		go func() {
-			addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
-			sendErr := rawSendTcpMsg(addr, bytesReturn)
-
-			logLT := strconv.Itoa(int(ev.LTime))
-			logT := time.Now().Format("2006-01-02 15:04:05")
-			logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''")
-			logResult := "0"
-			logErr := ""
-			if sendErr == nil {
-				logResult = "1"
-				logger.Debug("sendToTcp success")
-			} else {
-				logErr = sendErr.Error()
-				logger.Debug("sendToTcp err:", sendErr)
-			}
-
-			executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"})
-		}()
-	} else {
-		logger.Debug("targetNode is nil")
-	}
-}
-
-//澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰
-func HandleOtherQuery(ev *serf.Query) {
-	var reqBody RequestSerfTopicMsg
-	var ret []byte
-	if err := json.Unmarshal(ev.Payload, &reqBody); err != nil {
-		ret = []byte(err.Error())
-	} else {
-		if err, data := QueryLocalProc(reqBody); err != nil {
-			ret = []byte(err.Error())
-		} else {
-			b, e := json.Marshal(data)
-			if e != nil {
-				ret = []byte(e.Error())
-			} else {
-				ret = b
-			}
-		}
-	}
-
-	if err := ev.Respond(ret); err != nil {
-		logger.Debug("HandleOtherQuery err:", err)
-		return
-	}
-}
-
-func HandleQueryRpc(ev *serf.Query) {
-	var ret []byte
-	var arg RpcParamTopic
-	err := json.Unmarshal(ev.Payload, &arg)
-	if err == nil {
-		if f, ok := rpcHandlers[arg.Topic]; ok {
-			resp, e := f(arg)
-			if e == nil {
-				if data, me := json.Marshal(resp); me == nil {
-					ret = data
-				} else {
-					logger.Debug("marshal resp err:", e)
-				}
-			} else {
-				logger.Debug("call f err:", e)
-			}
-		} else {
-			logger.Debug("rpcHandlers not contains topic:", arg.Topic)
-		}
-	} else {
-		logger.Debug("unmarshal RpcParamTopic err:", err)
-	}
-	if rErr := ev.Respond(ret); rErr != nil {
-		logger.Debug("HandleQueryRpc err:", rErr)
-	}
-}
-
-func HandleEventMemberLeave(ev serf.MemberEvent) {
-	if ev.Members != nil && len(ev.Members) == 1 {
-		leaveMember := ev.Members[0]
-		leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
-		flag, e := executeSqlByGorm([]string{leaveSql})
-
-		logger.Info("EventMemberLeave,current Members:", ev.Members)
-		logLT := ""
-		logT := time.Now().Format("2006-01-02 15:04:05")
-		logSql := strings.ReplaceAll(leaveSql, "'", "''")
-		logResult := "0"
-		if flag {
-			logResult = "1"
-		}
-		logErr := ""
-		if e != nil {
-			logErr = e.Error()
-		}
-		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
-	}
-}
-
-func HandleEventMemberJoin(ev serf.MemberEvent) {
-	if ev.Members != nil && len(ev.Members) == 1 {
-		leaveMember := ev.Members[0]
-		joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
-		flag, e := executeSqlByGorm([]string{joinSql})
-
-		logger.Info("EventMemberJoin,current Members:", ev.Members)
-		logLT := ""
-		logT := time.Now().Format("2006-01-02 15:04:05")
-		logSql := strings.ReplaceAll(joinSql, "'", "''")
-		logResult := "0"
-		if flag {
-			logResult = "1"
-		}
-		logErr := ""
-		if e != nil {
-			logErr = e.Error()
-		}
-		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
-	}
-}
+package serf
+
+import (
+	"basic.com/valib/logger.git"
+	"basic.com/valib/serf.git/serf"
+	"encoding/json"
+	"github.com/golang/protobuf/proto"
+	"github.com/hashicorp/memberlist"
+	"github.com/satori/go.uuid"
+	"path/filepath"
+	"reflect"
+	"runtime"
+	"strconv"
+	"strings"
+	"time"
+	"vamicro/config"
+	"vamicro/system-service/bhome_msg_dev"
+)
+
+type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error)
+
+var rpcHandlers map[string]RpcHandle
+
+func init() {
+	rpcHandlers = make(map[string]RpcHandle)
+}
+
+// RegisterRpcHandles
+func RegisterRpcHandles(fs ...RpcHandle) {
+	for _, f := range fs {
+		name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName
+		nameEnd := filepath.Ext(name)
+		name = strings.TrimPrefix(nameEnd, ".")
+		if _, ok := rpcHandlers[name]; !ok {
+			rpcHandlers[name] = f
+		}
+	}
+}
+
+/*****************************UserEvent***************************************/
+func HandleUserEventSyncSql(ev serf.UserEvent) {
+	logger.Info("receive a UserEventSyncSql event")
+	var sqlUe SqlUserEvent
+	err := json.Unmarshal(ev.Payload, &sqlUe)
+	if err != nil {
+		logger.Error("sqlUe unmarshal err:", err)
+		return
+	}
+
+	logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
+	if sqlUe.Owner != config.Server.AnalyServerId {
+		go func() {
+			flag, e := executeSqlByGorm(sqlUe.Sql)
+			logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e)
+			logLT := strconv.Itoa(int(ev.LTime))
+			logT := time.Now().Format("2006-01-02 15:04:05")
+			logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
+			logResult := "0"
+			if flag {
+				logResult = "1"
+			}
+			logErr := ""
+			if e != nil {
+				logErr = e.Error()
+			}
+			executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"})
+		}()
+	}
+}
+
+func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) {
+	logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload))
+	SyncDbTablePersonCacheChan <- ev.Payload
+}
+
+func HandleUserEventSyncVirtualIp(ev serf.UserEvent) {
+	logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
+	SyncVirtualIpChan <- ev.Payload
+}
+
+//鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑
+func HandleSyncRegisterInfo(ev serf.UserEvent) {
+	logger.Debug("HandleSyncRegisterInfo")
+	var si bhome_msg_dev.MsgDevRegisterInfo
+	if err := proto.Unmarshal(ev.Payload, &si); err == nil {
+		logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId)
+		if string(si.DevId) != config.Server.AnalyServerId {
+			compareRPool(&si)
+		}
+	} else {
+		logger.Error("HandleSyncRegisterInfo unmarshal err:", err)
+	}
+}
+
+func HandleDataSystemSerfSub(ev serf.UserEvent) {
+	h := GetBusHandle()
+	if h == nil {
+		logger.Error("HandleDataSystemSerfSub bus handle is nil")
+		return
+	}
+	err := h.Publish(DataSystemSerfSubscribe, ev.Payload)
+	if err != nil {
+		logger.Error("HandleDataSystemSerfSub pub err:", err)
+	}
+}
+
+/*****************************Query***************************************/
+func HandleQueryEventUpdateDBData(ev *serf.Query) {
+	logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId)
+	var fromP QueryTableDataParam
+	err := json.Unmarshal(ev.Payload, &fromP)
+	if err != nil {
+		logger.Error("Query tableNames unmarshal err")
+		if err := ev.Respond([]byte("request unmarshal err")); err != nil {
+			logger.Error("query.Respond err: %s\n", err)
+			return
+		}
+
+		return
+	}
+	logger.Info("Query tableNames:", fromP.Tables)
+	datas, err := DumpTables(fromP.Tables)
+	if err != nil {
+		logger.Error("queryByGorm err:", err)
+		if err := ev.Respond([]byte("queryByGorm err")); err != nil {
+			logger.Error("query.Respond err: %s\n", err)
+			return
+		}
+		return
+	}
+	bytesReturn, err := json.Marshal(datas)
+	logger.Info("results.len: ", len(bytesReturn))
+
+	var targetNode *memberlist.Node
+	nodes := Agent.Serf().Memberlist().Members()
+	if nodes != nil && len(nodes) > 0 {
+		for _, n := range nodes {
+			if n.Name == fromP.From {
+				targetNode = n
+				break
+			}
+		}
+	}
+	logger.Debug("targetNode:", targetNode.Name)
+	if targetNode != nil {
+		go func() {
+			addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
+			sendErr := rawSendTcpMsg(addr, bytesReturn)
+
+			logLT := strconv.Itoa(int(ev.LTime))
+			logT := time.Now().Format("2006-01-02 15:04:05")
+			logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''")
+			logResult := "0"
+			logErr := ""
+			if sendErr == nil {
+				logResult = "1"
+				logger.Debug("sendToTcp success")
+			} else {
+				logErr = sendErr.Error()
+				logger.Debug("sendToTcp err:", sendErr)
+			}
+
+			executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"})
+		}()
+	} else {
+		logger.Debug("targetNode is nil")
+	}
+}
+
+//澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰
+func HandleOtherQuery(ev *serf.Query) {
+	var reqBody RequestSerfTopicMsg
+	var ret []byte
+	if err := json.Unmarshal(ev.Payload, &reqBody); err != nil {
+		ret = []byte(err.Error())
+	} else {
+		if err, data := QueryLocalProc(reqBody); err != nil {
+			ret = []byte(err.Error())
+		} else {
+			b, e := json.Marshal(data)
+			if e != nil {
+				ret = []byte(e.Error())
+			} else {
+				ret = b
+			}
+		}
+	}
+
+	if err := ev.Respond(ret); err != nil {
+		logger.Debug("HandleOtherQuery err:", err)
+		return
+	}
+}
+
+func HandleQueryRpc(ev *serf.Query) {
+	var ret []byte
+	var arg RpcParamTopic
+	err := json.Unmarshal(ev.Payload, &arg)
+	if err == nil {
+		if f, ok := rpcHandlers[arg.Topic]; ok {
+			resp, e := f(arg)
+			if e == nil {
+				if data, me := json.Marshal(resp); me == nil {
+					ret = data
+				} else {
+					logger.Debug("marshal resp err:", e)
+				}
+			} else {
+				logger.Debug("call f err:", e)
+			}
+		} else {
+			logger.Debug("rpcHandlers not contains topic:", arg.Topic)
+		}
+	} else {
+		logger.Debug("unmarshal RpcParamTopic err:", err)
+	}
+	if rErr := ev.Respond(ret); rErr != nil {
+		logger.Debug("HandleQueryRpc err:", rErr)
+	}
+}
+
+func HandleEventMemberLeave(ev serf.MemberEvent) {
+	if ev.Members != nil && len(ev.Members) == 1 {
+		leaveMember := ev.Members[0]
+		leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
+		flag, e := executeSqlByGorm([]string{leaveSql})
+
+		logger.Info("EventMemberLeave,current Members:", ev.Members)
+		logLT := ""
+		logT := time.Now().Format("2006-01-02 15:04:05")
+		logSql := strings.ReplaceAll(leaveSql, "'", "''")
+		logResult := "0"
+		if flag {
+			logResult = "1"
+		}
+		logErr := ""
+		if e != nil {
+			logErr = e.Error()
+		}
+		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
+	}
+}
+
+func HandleEventMemberJoin(ev serf.MemberEvent) {
+	if ev.Members != nil && len(ev.Members) == 1 {
+		leaveMember := ev.Members[0]
+		joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		flag, e := executeSqlByGorm([]string{joinSql})
+
+		logger.Info("EventMemberJoin,current Members:", ev.Members)
+		logLT := ""
+		logT := time.Now().Format("2006-01-02 15:04:05")
+		logSql := strings.ReplaceAll(joinSql, "'", "''")
+		logResult := "0"
+		if flag {
+			logResult = "1"
+		}
+		logErr := ""
+		if e != nil {
+			logErr = e.Error()
+		}
+		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
+	}
+}

--
Gitblit v1.8.0