liuxiaolong
2021-01-06 95a390eeb4876315acb41fa0009c65d8c4c9699d
hbusc.go
@@ -360,42 +360,48 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) {
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
   h.mtxNode.Lock()
   defer h.mtxNode.Unlock()
   var nodes []bhomebus.NetNode
   if h.nodes != nil {
      for _,n := range h.nodes {
         if serverId != "" { //获取指定节点的
            if n.SvrInfo.ID == serverId {
               if k,ok := n.Topic2Key[topic];ok {
                  nodes = append(nodes, bhomebus.NetNode{
                     IPHost:n.SvrInfo.IP,
                     Port:n.SvrInfo.Port,
                     Key:k,
                  })
               }
            }
         } else { //获取所有节点的
            if k,ok := n.Topic2Key[topic];ok {
               nodes = append(nodes, bhomebus.NetNode{
                  IPHost:n.SvrInfo.IP,
                  Port:n.SvrInfo.Port,
                  Key:k,
               })
            }
         }
   reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: h.sockWorker.peer,
   })
   reqD,err := json.Marshal(MsgInfo{
      SrcProc: *srcProc,
      MsgType: MesgType_ReqRep,
      Topic:   TOPIC_QUERYTOPIC,
      Body:    []byte(topic),
   })
   if err != nil {
      return nil, fmt.Errorf("marshal req err:%s", err.Error())
   }
   var ret []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
   if n > 0 {
      var reply CommonReply
      err = json.Unmarshal(ret[0].Data, &reply)
      if err != nil {
         return nil, err
      }
      if reply.Status == REPLY_SUCCESS {
         err = json.Unmarshal(reply.Body, &nodes)
         if err == nil {
            return nodes, nil
         } else {
            return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error())
         }
      } else {
         return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status)
      }
   } else {
      return nil,   fmt.Errorf("GetNetNodeByTopic n:%d", n)
   }
   if len(nodes) == 0 {
      return nil,fmt.Errorf("topic not found in nodes")
   }
   return nodes, nil
}
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
   //1.首先需要通过topic拿到本机对应的NetNode
   rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic)
   rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
   if err != nil {
      return nil, err
   }