liuxiaolong
2021-01-06 95a390eeb4876315acb41fa0009c65d8c4c9699d
修改GetNetNodeByTopic从bhomecenter中实时获取主题对应的NetNode列表
2个文件已修改
68 ■■■■ 已修改文件
hbusc.go 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go
@@ -360,42 +360,48 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) {
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
    h.mtxNode.Lock()
    defer h.mtxNode.Unlock()
    var nodes []bhomebus.NetNode
    if h.nodes != nil {
        for _,n := range h.nodes {
            if serverId != "" { //获取指定节点的
                if n.SvrInfo.ID == serverId {
                    if k,ok := n.Topic2Key[topic];ok {
                        nodes = append(nodes, bhomebus.NetNode{
                            IPHost:n.SvrInfo.IP,
                            Port:n.SvrInfo.Port,
                            Key:k,
                        })
                    }
                }
            } else { //获取所有节点的
                if k,ok := n.Topic2Key[topic];ok {
                    nodes = append(nodes, bhomebus.NetNode{
                        IPHost:n.SvrInfo.IP,
                        Port:n.SvrInfo.Port,
                        Key:k,
                    })
                }
            }
    reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
        Key: h.sockWorker.peer,
    })
    reqD,err := json.Marshal(MsgInfo{
        SrcProc: *srcProc,
        MsgType: MesgType_ReqRep,
        Topic:   TOPIC_QUERYTOPIC,
        Body:    []byte(topic),
    })
    if err != nil {
        return nil, fmt.Errorf("marshal req err:%s", err.Error())
    }
    var ret []bhomebus.Mesg
    n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
    if n > 0 {
        var reply CommonReply
        err = json.Unmarshal(ret[0].Data, &reply)
        if err != nil {
            return nil, err
        }
        if reply.Status == REPLY_SUCCESS {
            err = json.Unmarshal(reply.Body, &nodes)
            if err == nil {
                return nodes, nil
            } else {
                return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error())
            }
        } else {
            return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status)
        }
    } else {
        return nil,    fmt.Errorf("GetNetNodeByTopic n:%d", n)
    }
    if len(nodes) == 0 {
        return nil,fmt.Errorf("topic not found in nodes")
    }
    return nodes, nil
}
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
    //1.首先需要通过topic拿到本机对应的NetNode
    rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic)
    rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
    if err != nil {
        return nil, err
    }
micronode.go
@@ -170,8 +170,8 @@
}
//获取本机中某一个主题的 key  (结果只有一个元素)
func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
    netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
    netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
    if err != nil {
        return nil
    }
@@ -179,8 +179,8 @@
}
//获取集群中所有节点某个主题的key信息,   (结果可能有多个)
func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
    netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
    netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
    if err != nil {
        return nil
    }