修改GetNetNodeByTopic从bhomecenter中实时获取主题对应的NetNode列表
| | |
| | | //获取topic对应的key |
| | | //如果传了serverId不为空,则获取指定机器上的topic-key |
| | | //如果server为空,则获取所有节点上topic-key |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) { |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) { |
| | | h.mtxNode.Lock() |
| | | defer h.mtxNode.Unlock() |
| | | var nodes []bhomebus.NetNode |
| | | if h.nodes != nil { |
| | | for _,n := range h.nodes { |
| | | if serverId != "" { //获取指定节点的 |
| | | if n.SvrInfo.ID == serverId { |
| | | if k,ok := n.Topic2Key[topic];ok { |
| | | nodes = append(nodes, bhomebus.NetNode{ |
| | | IPHost:n.SvrInfo.IP, |
| | | Port:n.SvrInfo.Port, |
| | | Key:k, |
| | | }) |
| | | } |
| | | } |
| | | } else { //获取所有节点的 |
| | | if k,ok := n.Topic2Key[topic];ok { |
| | | nodes = append(nodes, bhomebus.NetNode{ |
| | | IPHost:n.SvrInfo.IP, |
| | | Port:n.SvrInfo.Port, |
| | | Key:k, |
| | | }) |
| | | } |
| | | } |
| | | reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: h.sockWorker.peer, |
| | | }) |
| | | reqD,err := json.Marshal(MsgInfo{ |
| | | SrcProc: *srcProc, |
| | | MsgType: MesgType_ReqRep, |
| | | Topic: TOPIC_QUERYTOPIC, |
| | | Body: []byte(topic), |
| | | }) |
| | | if err != nil { |
| | | return nil, fmt.Errorf("marshal req err:%s", err.Error()) |
| | | } |
| | | var ret []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) |
| | | if n > 0 { |
| | | var reply CommonReply |
| | | err = json.Unmarshal(ret[0].Data, &reply) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | if reply.Status == REPLY_SUCCESS { |
| | | err = json.Unmarshal(reply.Body, &nodes) |
| | | if err == nil { |
| | | return nodes, nil |
| | | } else { |
| | | return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error()) |
| | | } |
| | | } else { |
| | | return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status) |
| | | } |
| | | } else { |
| | | return nil, fmt.Errorf("GetNetNodeByTopic n:%d", n) |
| | | } |
| | | if len(nodes) == 0 { |
| | | return nil,fmt.Errorf("topic not found in nodes") |
| | | } |
| | | return nodes, nil |
| | | } |
| | | |
| | | func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) { |
| | | //1.首先需要通过topic拿到本机对应的NetNode |
| | | rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic) |
| | | rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | |
| | | } |
| | | |
| | | //获取本机中某一个主题的 key (结果只有一个元素) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | |
| | | } |
| | | |
| | | //获取集群中所有节点某个主题的key信息, (结果可能有多个) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) |
| | | if err != nil { |
| | | return nil |
| | | } |