liuxiaolong
2021-07-01 b9b16b451361341b990d3bbb78fc2d53b74202a0
hbusc.go
@@ -129,7 +129,7 @@
   handle.wg = &sync.WaitGroup{}
   //有订阅消息才需要启动协程接收消息
   if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
   if len(ri.SubTopic) > 0 {
      handle.printLog("sub topics")
      var subList bhome_msg.MsgTopicList
      for _,v := range ri.SubTopic {
@@ -137,11 +137,27 @@
      }
      var subReply bhome_msg.MsgCommonReply
      if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
         //启动订阅信息接收
         handle.wg.Add(1)
         go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
      if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
         handle.printLog("bhsgo.Subscribe ret false")
      }
   }
   if len(ri.SubNetTopic) > 0 {
      handle.printLog("sub net topics")
      var subNetList bhome_msg.MsgTopicList
      for _,v := range ri.SubNetTopic {
         subNetList.TopicList = append(subNetList.TopicList, []byte(v))
      }
      var subNetReply bhome_msg.MsgCommonReply
      if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) {
         handle.printLog("bhsgo.SubscribeNet ret false")
      }
   }
   if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 {
      //启动订阅信息接收
      handle.wg.Add(1)
      go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
   }
   return handle, nil
@@ -218,9 +234,20 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) {
   return nil, nil
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) {
   dest := bhome_msg.BHAddress{}
   reqTopic := bhome_msg.MsgQueryTopic{
      Topic: []byte(topic),
   }
   rep := bhome_msg.MsgQueryTopicReply{}
   if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) {
      return rep.NodeAddress, nil
   }
   if rep.Errmsg != nil {
      h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString))
      return nil, errors.New(string(rep.Errmsg.ErrString))
   }
   return nil, errors.New("bhsgo.QueryTopicAddress ret false")
}
func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
@@ -237,21 +264,26 @@
      return &reply, nil
   } else {
      i, s := bhsgo.GetLastError()
      h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic))
      return nil, errors.New("request ")
   }
}
func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) {
func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) {
   dest := bhome_msg.BHAddress{}
   if destArr != nil && len(destArr) > 0 {
      dest = destArr[0]
      if destArr[0].Addr != nil {
         dest = *(destArr[0].Addr)
      }
   }
   pid := ""
   r := bhome_msg.MsgRequestTopicReply{}
   if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
      return r.Data, nil
   } else {
      h.printLog("bhsgo.Request request err:", r.Errmsg)
      i, s := bhsgo.GetLastError()
      h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest)
      return nil, errors.New("bhsgo.Request return false")
   }
}
@@ -271,9 +303,15 @@
   return errors.New("reply return false")
}
func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) {
   return nil, errors.New("")
func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) {
   dest := &bhome_msg.BHAddress{}
   topic := &bhome_msg.MsgQueryProc{}
   rep := &bhome_msg.MsgQueryProcReply{}
   if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) {
      return rep.ProcList, nil
   } else {
      return nil, errors.New("QueryProcs ret flase")
   }
}