From 63645d248c765244488cd34dbc1bb6528ca6b7c7 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 05 九月 2023 09:58:13 +0800 Subject: [PATCH] 修复编译 --- system-service/serf/handler.go | 528 +++++++++++++++++++++++++++++----------------------------- 1 files changed, 264 insertions(+), 264 deletions(-) diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go index a6fc49b..6c72b22 100644 --- a/system-service/serf/handler.go +++ b/system-service/serf/handler.go @@ -1,264 +1,264 @@ -package serf - -import ( - "basic.com/valib/logger.git" - "basic.com/valib/serf.git/serf" - "encoding/json" - "github.com/golang/protobuf/proto" - "github.com/hashicorp/memberlist" - "github.com/satori/go.uuid" - "path/filepath" - "reflect" - "runtime" - "strconv" - "strings" - "time" - "vamicro/config" - "vamicro/system-service/bhome_msg_dev" -) - -type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) - -var rpcHandlers map[string]RpcHandle - -func init() { - rpcHandlers = make(map[string]RpcHandle) -} - -// RegisterRpcHandles -func RegisterRpcHandles(fs ...RpcHandle) { - for _, f := range fs { - name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName - nameEnd := filepath.Ext(name) - name = strings.TrimPrefix(nameEnd, ".") - if _, ok := rpcHandlers[name]; !ok { - rpcHandlers[name] = f - } - } -} - -/*****************************UserEvent***************************************/ -func HandleUserEventSyncSql(ev serf.UserEvent) { - logger.Info("receive a UserEventSyncSql event") - var sqlUe SqlUserEvent - err := json.Unmarshal(ev.Payload, &sqlUe) - if err != nil { - logger.Error("sqlUe unmarshal err:", err) - return - } - - logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql) - if sqlUe.Owner != config.Server.AnalyServerId { - go func() { - flag, e := executeSqlByGorm(sqlUe.Sql) - logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e) - logLT := strconv.Itoa(int(ev.LTime)) - logT := time.Now().Format("2006-01-02 15:04:05") - logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''") - logResult := "0" - if flag { - logResult = "1" - } - logErr := "" - if e != nil { - logErr = e.Error() - } - executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"}) - }() - } -} - -func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) { - logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload)) - SyncDbTablePersonCacheChan <- ev.Payload -} - -func HandleUserEventSyncVirtualIp(ev serf.UserEvent) { - logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") - SyncVirtualIpChan <- ev.Payload -} - -//鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑 -func HandleSyncRegisterInfo(ev serf.UserEvent) { - logger.Debug("HandleSyncRegisterInfo") - var si bhome_msg_dev.MsgDevRegisterInfo - if err := proto.Unmarshal(ev.Payload, &si); err == nil { - logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId) - if string(si.DevId) != config.Server.AnalyServerId { - compareRPool(&si) - } - } else { - logger.Error("HandleSyncRegisterInfo unmarshal err:", err) - } -} - -func HandleDataSystemSerfSub(ev serf.UserEvent) { - h := GetBusHandle() - if h == nil { - logger.Error("HandleDataSystemSerfSub bus handle is nil") - return - } - err := h.Publish(DataSystemSerfSubscribe, ev.Payload) - if err != nil { - logger.Error("HandleDataSystemSerfSub pub err:", err) - } -} - -/*****************************Query***************************************/ -func HandleQueryEventUpdateDBData(ev *serf.Query) { - logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId) - var fromP QueryTableDataParam - err := json.Unmarshal(ev.Payload, &fromP) - if err != nil { - logger.Error("Query tableNames unmarshal err") - if err := ev.Respond([]byte("request unmarshal err")); err != nil { - logger.Error("query.Respond err: %s\n", err) - return - } - - return - } - logger.Info("Query tableNames:", fromP.Tables) - datas, err := DumpTables(fromP.Tables) - if err != nil { - logger.Error("queryByGorm err:", err) - if err := ev.Respond([]byte("queryByGorm err")); err != nil { - logger.Error("query.Respond err: %s\n", err) - return - } - return - } - bytesReturn, err := json.Marshal(datas) - logger.Info("results.len: ", len(bytesReturn)) - - var targetNode *memberlist.Node - nodes := Agent.Serf().Memberlist().Members() - if nodes != nil && len(nodes) > 0 { - for _, n := range nodes { - if n.Name == fromP.From { - targetNode = n - break - } - } - } - logger.Debug("targetNode:", targetNode.Name) - if targetNode != nil { - go func() { - addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) - sendErr := rawSendTcpMsg(addr, bytesReturn) - - logLT := strconv.Itoa(int(ev.LTime)) - logT := time.Now().Format("2006-01-02 15:04:05") - logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''") - logResult := "0" - logErr := "" - if sendErr == nil { - logResult = "1" - logger.Debug("sendToTcp success") - } else { - logErr = sendErr.Error() - logger.Debug("sendToTcp err:", sendErr) - } - - executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"}) - }() - } else { - logger.Debug("targetNode is nil") - } -} - -//澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰 -func HandleOtherQuery(ev *serf.Query) { - var reqBody RequestSerfTopicMsg - var ret []byte - if err := json.Unmarshal(ev.Payload, &reqBody); err != nil { - ret = []byte(err.Error()) - } else { - if err, data := QueryLocalProc(reqBody); err != nil { - ret = []byte(err.Error()) - } else { - b, e := json.Marshal(data) - if e != nil { - ret = []byte(e.Error()) - } else { - ret = b - } - } - } - - if err := ev.Respond(ret); err != nil { - logger.Debug("HandleOtherQuery err:", err) - return - } -} - -func HandleQueryRpc(ev *serf.Query) { - var ret []byte - var arg RpcParamTopic - err := json.Unmarshal(ev.Payload, &arg) - if err == nil { - if f, ok := rpcHandlers[arg.Topic]; ok { - resp, e := f(arg) - if e == nil { - if data, me := json.Marshal(resp); me == nil { - ret = data - } else { - logger.Debug("marshal resp err:", e) - } - } else { - logger.Debug("call f err:", e) - } - } else { - logger.Debug("rpcHandlers not contains topic:", arg.Topic) - } - } else { - logger.Debug("unmarshal RpcParamTopic err:", err) - } - if rErr := ev.Respond(ret); rErr != nil { - logger.Debug("HandleQueryRpc err:", rErr) - } -} - -func HandleEventMemberLeave(ev serf.MemberEvent) { - if ev.Members != nil && len(ev.Members) == 1 { - leaveMember := ev.Members[0] - leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" - flag, e := executeSqlByGorm([]string{leaveSql}) - - logger.Info("EventMemberLeave,current Members:", ev.Members) - logLT := "" - logT := time.Now().Format("2006-01-02 15:04:05") - logSql := strings.ReplaceAll(leaveSql, "'", "''") - logResult := "0" - if flag { - logResult = "1" - } - logErr := "" - if e != nil { - logErr = e.Error() - } - executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) - } -} - -func HandleEventMemberJoin(ev serf.MemberEvent) { - if ev.Members != nil && len(ev.Members) == 1 { - leaveMember := ev.Members[0] - joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" - flag, e := executeSqlByGorm([]string{joinSql}) - - logger.Info("EventMemberJoin,current Members:", ev.Members) - logLT := "" - logT := time.Now().Format("2006-01-02 15:04:05") - logSql := strings.ReplaceAll(joinSql, "'", "''") - logResult := "0" - if flag { - logResult = "1" - } - logErr := "" - if e != nil { - logErr = e.Error() - } - executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) - } -} +package serf + +import ( + "basic.com/valib/logger.git" + "basic.com/valib/serf.git/serf" + "encoding/json" + "github.com/golang/protobuf/proto" + "github.com/hashicorp/memberlist" + "github.com/satori/go.uuid" + "path/filepath" + "reflect" + "runtime" + "strconv" + "strings" + "time" + "vamicro/config" + "vamicro/system-service/bhome_msg_dev" +) + +type RpcHandle func(arg RpcParamTopic) ([]serf.NodeResponse, error) + +var rpcHandlers map[string]RpcHandle + +func init() { + rpcHandlers = make(map[string]RpcHandle) +} + +// RegisterRpcHandles +func RegisterRpcHandles(fs ...RpcHandle) { + for _, f := range fs { + name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() //absolute package path name eg: a.b.c.d.FuncName + nameEnd := filepath.Ext(name) + name = strings.TrimPrefix(nameEnd, ".") + if _, ok := rpcHandlers[name]; !ok { + rpcHandlers[name] = f + } + } +} + +/*****************************UserEvent***************************************/ +func HandleUserEventSyncSql(ev serf.UserEvent) { + logger.Info("receive a UserEventSyncSql event") + var sqlUe SqlUserEvent + err := json.Unmarshal(ev.Payload, &sqlUe) + if err != nil { + logger.Error("sqlUe unmarshal err:", err) + return + } + + logger.Info("ev.LTime:", ev.LTime, "owner:", sqlUe.Owner, "sql:", sqlUe.Sql) + if sqlUe.Owner != config.Server.AnalyServerId { + go func() { + flag, e := executeSqlByGorm(sqlUe.Sql) + logger.Info("ev.LTime:", ev.LTime, "userEvent exec ", sqlUe.Sql, ",Result:", flag, ", err:", e) + logLT := strconv.Itoa(int(ev.LTime)) + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''") + logResult := "0" + if flag { + logResult = "1" + } + logErr := "" + if e != nil { + logErr = e.Error() + } + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + sqlUe.Owner + "'," + logResult + ",'" + logErr + "')"}) + }() + } +} + +func HandleUserEventSyncDbTablePersonCache(ev serf.UserEvent) { + logger.Info("LTime:", ev.LTime, ",ev.Payload.len:", len(ev.Payload)) + SyncDbTablePersonCacheChan <- ev.Payload +} + +func HandleUserEventSyncVirtualIp(ev serf.UserEvent) { + logger.Info("LTime:", ev.LTime, " Recevie virtualIp change") + SyncVirtualIpChan <- ev.Payload +} + +//鏀跺埌鍏跺畠鑺傜偣涓诲姩灏嗘敞鍐屼腑蹇冪殑鎵�鏈塼opic閫氱煡鍒伴泦缇や腑 +func HandleSyncRegisterInfo(ev serf.UserEvent) { + logger.Debug("HandleSyncRegisterInfo") + var si bhome_msg_dev.MsgDevRegisterInfo + if err := proto.Unmarshal(ev.Payload, &si); err == nil { + logger.Debug("HandleSyncRegisterInfo si.DevId:", string(si.DevId), " config.Server.AnalyServerId:", config.Server.AnalyServerId) + if string(si.DevId) != config.Server.AnalyServerId { + compareRPool(&si) + } + } else { + logger.Error("HandleSyncRegisterInfo unmarshal err:", err) + } +} + +func HandleDataSystemSerfSub(ev serf.UserEvent) { + h := GetBusHandle() + if h == nil { + logger.Error("HandleDataSystemSerfSub bus handle is nil") + return + } + err := h.Publish(DataSystemSerfSubscribe, ev.Payload) + if err != nil { + logger.Error("HandleDataSystemSerfSub pub err:", err) + } +} + +/*****************************Query***************************************/ +func HandleQueryEventUpdateDBData(ev *serf.Query) { + logger.Info("receive QueryEventUpdateDBData, current node:", config.Server.AnalyServerId) + var fromP QueryTableDataParam + err := json.Unmarshal(ev.Payload, &fromP) + if err != nil { + logger.Error("Query tableNames unmarshal err") + if err := ev.Respond([]byte("request unmarshal err")); err != nil { + logger.Error("query.Respond err: %s\n", err) + return + } + + return + } + logger.Info("Query tableNames:", fromP.Tables) + datas, err := DumpTables(fromP.Tables) + if err != nil { + logger.Error("queryByGorm err:", err) + if err := ev.Respond([]byte("queryByGorm err")); err != nil { + logger.Error("query.Respond err: %s\n", err) + return + } + return + } + bytesReturn, err := json.Marshal(datas) + logger.Info("results.len: ", len(bytesReturn)) + + var targetNode *memberlist.Node + nodes := Agent.Serf().Memberlist().Members() + if nodes != nil && len(nodes) > 0 { + for _, n := range nodes { + if n.Name == fromP.From { + targetNode = n + break + } + } + } + logger.Debug("targetNode:", targetNode.Name) + if targetNode != nil { + go func() { + addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) + sendErr := rawSendTcpMsg(addr, bytesReturn) + + logLT := strconv.Itoa(int(ev.LTime)) + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name, "'", "''") + logResult := "0" + logErr := "" + if sendErr == nil { + logResult = "1" + logger.Debug("sendToTcp success") + } else { + logErr = sendErr.Error() + logger.Debug("sendToTcp err:", sendErr) + } + + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + targetNode.Name + "'," + logResult + ",'" + logErr + "')"}) + }() + } else { + logger.Debug("targetNode is nil") + } +} + +//澶勭悊鍏朵粬鐨勪竴浜泀uery璇锋眰 +func HandleOtherQuery(ev *serf.Query) { + var reqBody RequestSerfTopicMsg + var ret []byte + if err := json.Unmarshal(ev.Payload, &reqBody); err != nil { + ret = []byte(err.Error()) + } else { + if err, data := QueryLocalProc(reqBody); err != nil { + ret = []byte(err.Error()) + } else { + b, e := json.Marshal(data) + if e != nil { + ret = []byte(e.Error()) + } else { + ret = b + } + } + } + + if err := ev.Respond(ret); err != nil { + logger.Debug("HandleOtherQuery err:", err) + return + } +} + +func HandleQueryRpc(ev *serf.Query) { + var ret []byte + var arg RpcParamTopic + err := json.Unmarshal(ev.Payload, &arg) + if err == nil { + if f, ok := rpcHandlers[arg.Topic]; ok { + resp, e := f(arg) + if e == nil { + if data, me := json.Marshal(resp); me == nil { + ret = data + } else { + logger.Debug("marshal resp err:", e) + } + } else { + logger.Debug("call f err:", e) + } + } else { + logger.Debug("rpcHandlers not contains topic:", arg.Topic) + } + } else { + logger.Debug("unmarshal RpcParamTopic err:", err) + } + if rErr := ev.Respond(ret); rErr != nil { + logger.Debug("HandleQueryRpc err:", rErr) + } +} + +func HandleEventMemberLeave(ev serf.MemberEvent) { + if ev.Members != nil && len(ev.Members) == 1 { + leaveMember := ev.Members[0] + leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'" + flag, e := executeSqlByGorm([]string{leaveSql}) + + logger.Info("EventMemberLeave,current Members:", ev.Members) + logLT := "" + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll(leaveSql, "'", "''") + logResult := "0" + if flag { + logResult = "1" + } + logErr := "" + if e != nil { + logErr = e.Error() + } + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) + } +} + +func HandleEventMemberJoin(ev serf.MemberEvent) { + if ev.Members != nil && len(ev.Members) == 1 { + leaveMember := ev.Members[0] + joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'" + flag, e := executeSqlByGorm([]string{joinSql}) + + logger.Info("EventMemberJoin,current Members:", ev.Members) + logLT := "" + logT := time.Now().Format("2006-01-02 15:04:05") + logSql := strings.ReplaceAll(joinSql, "'", "''") + logResult := "0" + if flag { + logResult = "1" + } + logErr := "" + if e != nil { + logErr = e.Error() + } + executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"}) + } +} -- Gitblit v1.8.0