From 63645d248c765244488cd34dbc1bb6528ca6b7c7 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 05 九月 2023 09:58:13 +0800 Subject: [PATCH] 修复编译 --- system-service/serf/proxy.go | 492 +++++++++++++++++++++++++++--------------------------- 1 files changed, 246 insertions(+), 246 deletions(-) diff --git a/system-service/serf/proxy.go b/system-service/serf/proxy.go index 1396ebd..a2c71b6 100644 --- a/system-service/serf/proxy.go +++ b/system-service/serf/proxy.go @@ -1,246 +1,246 @@ -package serf - -import ( - "basic.com/valib/bhomeclient.git" - "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg" - "basic.com/valib/logger.git" - "basic.com/valib/serf.git/serf" - "encoding/json" - "errors" - "github.com/golang/protobuf/proto" - "reflect" - "strconv" - "sync" - "time" - "vamicro/config" - "vamicro/extend/util" - "vamicro/system-service/bhome_msg_dev" -) - -type RequestSerfTopicMsg struct { - Topic string `json:"topic"` - Msg []byte `json:"msg"` - targetNodeIds []string `json:"targetNodeIds"` - timeout time.Duration -} - -type RequestSerfTopicResp struct { - DevId string `json:"devId"` - DevIp string `json:"devIp"` - Topic string `json:"topic"` - Procs []TopicProc `json:"procs"` -} - -type TopicProc struct { - ProcId string `json:"procId"` - Addr bhome_msg.BHAddress `json:"addr"` -} - -//閫氳繃topic鑾峰彇鎸囧畾topic鐨勬墍鏈夎妭鐐�,渚沚us浣跨敤 -func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) { - req := RequestSerfTopicMsg{ - Topic: QueryNodesByTopic, - timeout: time.Second * 10, - } - err, data := QuerySerfNodes(req) - if err != nil { - return nil, err - } - var result []RequestSerfTopicResp - if len(data) > 0 { - for _, d := range data { - var arr []RequestSerfTopicResp - if err = json.Unmarshal(d, &arr); err != nil { - logger.Debug("json Unmarshal err:", err) - } else { - result = append(result, arr...) - } - } - } - - return result, nil -} - -//鍝嶅簲鏍规嵁topic鑾峰彇鍦板潃 -func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) { - clients := hms.GetLocalNetNodeByTopic(topic) - if len(clients) == 0 { - return nil, errors.New("topic not exist") - } else { - ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) - var destArr []TopicProc - for idx, c := range clients { - tp := TopicProc{ - ProcId: string(c.ProcId), - Addr: *(clients[idx].Addr), - } - //if c. != nil { - // for _,p := range cli.Service.TopicList { - // if topic == string(p) { - // tps = append(tps, TopicProc{ - // ProcId: string(cli.Proc.ProcId), - // ProcName: string(cli.Proc.Name), - // }) - // } - // } - //} - destArr = append(destArr, tp) - } - r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{ - Topic: topic, - DevId: config.Server.AnalyServerId, - DevIp: ipv4, - Procs: destArr, - }) - return r, nil - } -} - -//浠g悊璇锋眰锛岄�氳繃褰撳墠鐨剆erf鍙戦�佺粰鍏朵粬鐨剆erf鑺傜偣 -func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) { - resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{ - FilterNodes: req.targetNodeIds, - }) - if err != nil { - return err, nil - } - - var data [][]byte - var retE error - after := time.After(req.timeout) -loop: - for { - select { - case r := <-resp.ResponseCh(): - data = append(data, r.Payload) - case <-after: - //retE = errors.New("time out") - break loop - } - } - - return retE, data -} - -//serf鎺ユ敹鍒皅uery娑堟伅鍚庨渶瑕佽闂湰鍦拌繘绋嬭幏寰楃粨鏋滃苟鍙嶉 -func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) { - br := bhomeclient.Request{ - Path: req.Topic, - Body: req.Msg, - } - reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000) - if err != nil { - return err, nil - } - return nil, reply -} - -var ( - riPool map[string]bhome_msg_dev.MsgDevRegisterInfo - topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center" - riLock sync.Mutex -) - -func init() { - riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo) -} - -//浠巗erf涓幏鍙栨敞鍐屼腑蹇冪殑娉ㄥ唽淇℃伅鏄惁鍙樺寲锛屽苟灏嗛泦缇や腑鎵�鏈夋敞鍐屼俊鎭眹鎬� -func DoSyncRegisterInfo() error { - if hms == nil || Agent == nil { - return errors.New("ms or Agent handle is nil") - } - infos, err := hms.GetRegisteredClient() - if err != nil { - return err - } - ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) - ri := bhome_msg_dev.MsgDevRegisterInfo{ - DevId: []byte(config.Server.AnalyServerId), - Ip: []byte(ipv4), - ProxyPort: 4020, - } - for _, i := range infos { - if i.Online { - mta := bhome_msg_dev.MsgProcTopicsAll{ - Online: i.Online, - } - if i.Proc != nil { - mta.Proc = &bhome_msg_dev.ProcInfo{ - ProcId: i.Proc.ProcId, - Name: i.Proc.Name, - PublicInfo: i.Proc.PublicInfo, - PrivateInfo: i.Proc.PrivateInfo, - } - //proc鐨勭鍙d俊鎭斁鍦≒ublicInfo涓� - if string(mta.Proc.PublicInfo) != "" { - if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 { - mta.Port = int32(iPort) - } - } - } - if i.Topics != nil { - mta.TopicList = &bhome_msg_dev.MsgTopicList{ - TopicList: i.Topics.TopicList, - } - } - if i.SubLocalTopics != nil { - mta.LocalSublist = &bhome_msg_dev.MsgTopicList{ - TopicList: i.SubLocalTopics.TopicList, - } - } - if i.SubNetTopics != nil { - mta.NetSublist = &bhome_msg_dev.MsgTopicList{ - TopicList: i.SubNetTopics.TopicList, - } - } - ri.ProcInfos = append(ri.ProcInfos, &mta) - } - } - - riLock.Lock() - bs := false //鍒ゆ柇鏄惁鏇存敼锛屽鏋滃彂鐢熸洿鏀瑰垯闇�瑕侀噸鏂板彂甯冨埌center涓� - if v, ok := riPool[string(ri.DevId)]; !ok { - bs = true - riPool[string(ri.DevId)] = ri - } else { - if !reflect.DeepEqual(v, ri) { - bs = true - riPool[string(ri.DevId)] = ri - } - } - riLock.Unlock() - - if bs { - bts, err := proto.Marshal(&ri) - if err != nil { - logger.Debug("marshal riPool err:", err) - return err - } - - logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts)) - Agent.UserEvent(UserEventSyncRegisterInfo, bts, false) - } - return nil -} - -func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) { - logger.Debug("Enter compareRPool") - riLock.Lock() - riPool[string(ri.DevId)] = *ri - riLock.Unlock() - - b, _ := proto.Marshal(ri) - - logger.Debug("before hms.Publish to topicPubAllRegisterInfo") - err := hms.Publish(topicPubAllRegisterInfo, b) - if err != nil { - logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err) - } else { - logger.Debug("hms.Publish success") - } -} - -func SyncProxy(topic string, body []byte) error { - return Agent.UserEvent(topic, body, false) -} +package serf + +import ( + "basic.com/valib/bhomeclient.git" + "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg" + "basic.com/valib/logger.git" + "basic.com/valib/serf.git/serf" + "encoding/json" + "errors" + "github.com/golang/protobuf/proto" + "reflect" + "strconv" + "sync" + "time" + "vamicro/config" + "vamicro/extend/util" + "vamicro/system-service/bhome_msg_dev" +) + +type RequestSerfTopicMsg struct { + Topic string `json:"topic"` + Msg []byte `json:"msg"` + targetNodeIds []string `json:"targetNodeIds"` + timeout time.Duration +} + +type RequestSerfTopicResp struct { + DevId string `json:"devId"` + DevIp string `json:"devIp"` + Topic string `json:"topic"` + Procs []TopicProc `json:"procs"` +} + +type TopicProc struct { + ProcId string `json:"procId"` + Addr bhome_msg.BHAddress `json:"addr"` +} + +//閫氳繃topic鑾峰彇鎸囧畾topic鐨勬墍鏈夎妭鐐�,渚沚us浣跨敤 +func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) { + req := RequestSerfTopicMsg{ + Topic: QueryNodesByTopic, + timeout: time.Second * 10, + } + err, data := QuerySerfNodes(req) + if err != nil { + return nil, err + } + var result []RequestSerfTopicResp + if len(data) > 0 { + for _, d := range data { + var arr []RequestSerfTopicResp + if err = json.Unmarshal(d, &arr); err != nil { + logger.Debug("json Unmarshal err:", err) + } else { + result = append(result, arr...) + } + } + } + + return result, nil +} + +//鍝嶅簲鏍规嵁topic鑾峰彇鍦板潃 +func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) { + clients := hms.GetLocalNetNodeByTopic(topic) + if len(clients) == 0 { + return nil, errors.New("topic not exist") + } else { + ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) + var destArr []TopicProc + for idx, c := range clients { + tp := TopicProc{ + ProcId: string(c.ProcId), + Addr: *(clients[idx].Addr), + } + //if c. != nil { + // for _,p := range cli.Service.TopicList { + // if topic == string(p) { + // tps = append(tps, TopicProc{ + // ProcId: string(cli.Proc.ProcId), + // ProcName: string(cli.Proc.Name), + // }) + // } + // } + //} + destArr = append(destArr, tp) + } + r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{ + Topic: topic, + DevId: config.Server.AnalyServerId, + DevIp: ipv4, + Procs: destArr, + }) + return r, nil + } +} + +//浠g悊璇锋眰锛岄�氳繃褰撳墠鐨剆erf鍙戦�佺粰鍏朵粬鐨剆erf鑺傜偣 +func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) { + resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{ + FilterNodes: req.targetNodeIds, + }) + if err != nil { + return err, nil + } + + var data [][]byte + var retE error + after := time.After(req.timeout) +loop: + for { + select { + case r := <-resp.ResponseCh(): + data = append(data, r.Payload) + case <-after: + //retE = errors.New("time out") + break loop + } + } + + return retE, data +} + +//serf鎺ユ敹鍒皅uery娑堟伅鍚庨渶瑕佽闂湰鍦拌繘绋嬭幏寰楃粨鏋滃苟鍙嶉 +func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) { + br := bhomeclient.Request{ + Path: req.Topic, + Body: req.Msg, + } + reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000) + if err != nil { + return err, nil + } + return nil, reply +} + +var ( + riPool map[string]bhome_msg_dev.MsgDevRegisterInfo + topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center" + riLock sync.Mutex +) + +func init() { + riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo) +} + +//浠巗erf涓幏鍙栨敞鍐屼腑蹇冪殑娉ㄥ唽淇℃伅鏄惁鍙樺寲锛屽苟灏嗛泦缇や腑鎵�鏈夋敞鍐屼俊鎭眹鎬� +func DoSyncRegisterInfo() error { + if hms == nil || Agent == nil { + return errors.New("ms or Agent handle is nil") + } + infos, err := hms.GetRegisteredClient() + if err != nil { + return err + } + ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) + ri := bhome_msg_dev.MsgDevRegisterInfo{ + DevId: []byte(config.Server.AnalyServerId), + Ip: []byte(ipv4), + ProxyPort: 4020, + } + for _, i := range infos { + if i.Online { + mta := bhome_msg_dev.MsgProcTopicsAll{ + Online: i.Online, + } + if i.Proc != nil { + mta.Proc = &bhome_msg_dev.ProcInfo{ + ProcId: i.Proc.ProcId, + Name: i.Proc.Name, + PublicInfo: i.Proc.PublicInfo, + PrivateInfo: i.Proc.PrivateInfo, + } + //proc鐨勭鍙d俊鎭斁鍦≒ublicInfo涓� + if string(mta.Proc.PublicInfo) != "" { + if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 { + mta.Port = int32(iPort) + } + } + } + if i.Topics != nil { + mta.TopicList = &bhome_msg_dev.MsgTopicList{ + TopicList: i.Topics.TopicList, + } + } + if i.SubLocalTopics != nil { + mta.LocalSublist = &bhome_msg_dev.MsgTopicList{ + TopicList: i.SubLocalTopics.TopicList, + } + } + if i.SubNetTopics != nil { + mta.NetSublist = &bhome_msg_dev.MsgTopicList{ + TopicList: i.SubNetTopics.TopicList, + } + } + ri.ProcInfos = append(ri.ProcInfos, &mta) + } + } + + riLock.Lock() + bs := false //鍒ゆ柇鏄惁鏇存敼锛屽鏋滃彂鐢熸洿鏀瑰垯闇�瑕侀噸鏂板彂甯冨埌center涓� + if v, ok := riPool[string(ri.DevId)]; !ok { + bs = true + riPool[string(ri.DevId)] = ri + } else { + if !reflect.DeepEqual(v, ri) { + bs = true + riPool[string(ri.DevId)] = ri + } + } + riLock.Unlock() + + if bs { + bts, err := proto.Marshal(&ri) + if err != nil { + logger.Debug("marshal riPool err:", err) + return err + } + + logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts)) + Agent.UserEvent(UserEventSyncRegisterInfo, bts, false) + } + return nil +} + +func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) { + logger.Debug("Enter compareRPool") + riLock.Lock() + riPool[string(ri.DevId)] = *ri + riLock.Unlock() + + b, _ := proto.Marshal(ri) + + logger.Debug("before hms.Publish to topicPubAllRegisterInfo") + err := hms.Publish(topicPubAllRegisterInfo, b) + if err != nil { + logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err) + } else { + logger.Debug("hms.Publish success") + } +} + +func SyncProxy(topic string, body []byte) error { + return Agent.UserEvent(topic, body, false) +} -- Gitblit v1.8.0