From 95a390eeb4876315acb41fa0009c65d8c4c9699d Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期三, 06 一月 2021 18:41:28 +0800
Subject: [PATCH] 修改GetNetNodeByTopic从bhomecenter中实时获取主题对应的NetNode列表
---
micronode.go | 8 ++--
hbusc.go | 60 ++++++++++++++++-------------
2 files changed, 37 insertions(+), 31 deletions(-)
diff --git a/hbusc.go b/hbusc.go
index 8581eb7..f558b23 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -360,42 +360,48 @@
//鑾峰彇topic瀵瑰簲鐨刱ey
//濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key
//濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key
-func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) {
+func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
h.mtxNode.Lock()
defer h.mtxNode.Unlock()
var nodes []bhomebus.NetNode
- if h.nodes != nil {
- for _,n := range h.nodes {
- if serverId != "" { //鑾峰彇鎸囧畾鑺傜偣鐨�
- if n.SvrInfo.ID == serverId {
- if k,ok := n.Topic2Key[topic];ok {
- nodes = append(nodes, bhomebus.NetNode{
- IPHost:n.SvrInfo.IP,
- Port:n.SvrInfo.Port,
- Key:k,
- })
- }
- }
- } else { //鑾峰彇鎵�鏈夎妭鐐圭殑
- if k,ok := n.Topic2Key[topic];ok {
- nodes = append(nodes, bhomebus.NetNode{
- IPHost:n.SvrInfo.IP,
- Port:n.SvrInfo.Port,
- Key:k,
- })
- }
- }
+ reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
+ Key: h.sockWorker.peer,
+ })
+ reqD,err := json.Marshal(MsgInfo{
+ SrcProc: *srcProc,
+ MsgType: MesgType_ReqRep,
+ Topic: TOPIC_QUERYTOPIC,
+ Body: []byte(topic),
+ })
+ if err != nil {
+ return nil, fmt.Errorf("marshal req err:%s", err.Error())
+ }
+ var ret []bhomebus.Mesg
+ n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
+ if n > 0 {
+ var reply CommonReply
+ err = json.Unmarshal(ret[0].Data, &reply)
+ if err != nil {
+ return nil, err
}
+ if reply.Status == REPLY_SUCCESS {
+ err = json.Unmarshal(reply.Body, &nodes)
+ if err == nil {
+ return nodes, nil
+ } else {
+ return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error())
+ }
+ } else {
+ return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status)
+ }
+ } else {
+ return nil, fmt.Errorf("GetNetNodeByTopic n:%d", n)
}
- if len(nodes) == 0 {
- return nil,fmt.Errorf("topic not found in nodes")
- }
- return nodes, nil
}
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
//1.棣栧厛闇�瑕侀�氳繃topic鎷垮埌鏈満瀵瑰簲鐨凬etNode
- rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic)
+ rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
if err != nil {
return nil, err
}
diff --git a/micronode.go b/micronode.go
index fd4ea48..75b3ffe 100644
--- a/micronode.go
+++ b/micronode.go
@@ -170,8 +170,8 @@
}
//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
- netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
if err != nil {
return nil
}
@@ -179,8 +179,8 @@
}
//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
- netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
if err != nil {
return nil
}
--
Gitblit v1.8.0