zhangzengfei
2023-11-30 408f252ff3382ace333d96e85f49980a0e0b9b6f
hbusc.go
@@ -1,63 +1,47 @@
package bhomeclient
import (
   "basic.com/valib/bhomebus.git"
   "basic.com/valib/c_bhomebus.git/api/bhsgo"
   "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
   "context"
   "encoding/json"
   "errors"
   "fmt"
   "os"
   "strconv"
   "sync"
   "time"
   "unsafe"
   "github.com/bytedance/sonic"
)
type sockServer struct {
   sock *bhomebus.Socket
   info *ProcInfo
}
type sockClient struct {
   sock *bhomebus.Socket
   peer int
}
type TransInfo struct {
   info *MsgInfo
   port int
type MsgReq struct {
   ProcId       string
   bhome_msg.MsgRequestTopic
   Src       unsafe.Pointer
}
type BHBus struct {
   ctx context.Context
   ctx       context.Context
   conf *Config
   ri          *RegisterInfo
   nodes []NodeInfo     //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
   mtxNode sync.Mutex   //访问节点主题表时,需要加锁
   conf       *Config
   m map[string]*sockServer
   nodes       []NodeInfo     //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
   mtxNode    sync.Mutex   //访问节点主题表时,需要加锁
   wg *sync.WaitGroup
   wg          *sync.WaitGroup
   sockRep *sockServer  //响应其他进程request的socket,server
   sockHB *sockClient  //维持心跳的socket,线程实时发送,需要单独处理
   sockPub *sockClient  //发布主题的socket,需要单独socket处理
   sockSub *sockClient  //订阅主题的socket,线程实时接收消息,需要单独处理
   sockWorker    *sockClient  //发给任意的server,短暂的request client
   mtxWorker    sync.Mutex    //SendAndRecv可能不是线程安全的
   chSub chan TransInfo
   chReply chan TransInfo
   ChSub   chan bhome_msg.MsgPublish
   ChReply chan MsgReq
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
   var data []byte
   var key int
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
   var procId string
   var msg bhome_msg.MsgRequestTopic
   var src unsafe.Pointer
   for {
      select {
      case <-ctx.Done():
@@ -65,22 +49,19 @@
         wg.Done()
         return
      default:
         n := s.RecvfromTimeout(&data, &key, 100) //目前10001返回值表示超时
         if n == 0 {
            var info MsgInfo
            if err := json.Unmarshal(data, &info);err == nil {
               ch <- TransInfo{
                  info: &info,
                  port: key,  //这个key在发布订阅模式中是bus的key,是个固定值,上层用不到
               }
               data = []byte{}
               key = 0
            } else {
               logFn("unmarshal to MsgInfo err:", err)
         if bhsgo.ReadRequest(&procId, &msg, &src, 100) {
            ch <- MsgReq{
               procId,
               msg,
               src,
            }
            logFn("ReadRequest topic:", string(msg.Topic), " data:", string(msg.Data))
            procId = ""
            msg.Reset()
            src = unsafe.Pointer(nil)
         } else {
            time.Sleep(10 * time.Millisecond)
            time.Sleep(100 * time.Millisecond)
         }
      }
   }
@@ -88,38 +69,21 @@
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
   handle := &BHBus{
      conf: config,
      m: make(map[string]*sockServer),
      chSub: make(chan TransInfo, config.chSize),
      chReply: make(chan TransInfo, config.chSize),
   handle := &BHBus {
      ctx:     ctx,
      conf:    config,
      ri:      ri,
      wg: &sync.WaitGroup{},
      ChSub:   make(chan bhome_msg.MsgPublish, config.chSize),
      ChReply: make(chan MsgReq, config.chSize),
   }
   var err error
   err = bhomebus.Init("libshm_queue.so")
   if err != nil {
      handle.printLog("Init so err:", err)
      return nil, err
   }
   err = bhomebus.ShmInit(512)
   if err != nil {
      handle.printLog("shmInit size err:", err)
      return nil, err
   }
   regSock := bhomebus.OpenSocket()
   if regSock == nil {
      handle.printLog("Open Socket ret Nil")
      return nil, errors.New("OpenSocket ret Nil")
   }
   defer func() {
      regSock.Close()
      handle.printLog("regSock.CLose")
   }()
   var msg []byte
   var regAddr []bhomebus.NetNode
   var regR *RegisterReply  //注册结果信息
   //如果注册失败,就会一直尝试注册
   procI := bhome_msg.ProcInfo{
      ProcId: []byte(ri.Proc.ID),
      Name: []byte(ri.Proc.Name),
   }
   var regReply bhome_msg.MsgCommonReply
loop:
   for {
      select {
@@ -127,164 +91,115 @@
         handle.printLog("register <-q")
         return nil,errors.New("ctx is done")
      default:
         if msg == nil {
            rid, err := json.Marshal(*ri)
            if err != nil {
               handle.printLog("marshal registerInfo err:", err)
               return nil, errors.New("marshal registerInfo err:"+err.Error())
            }
            s := MsgInfo{
               SrcProc: ri.Proc,
               MsgType: MesgType_ReqRep,
               Topic:   TOPIC_REGISTER,
               Body:    rid,
            }
            handle.printLog("register MsgInfo:", s)
            dRegData,err := json.Marshal(s)
            if err != nil {
               handle.printLog("marshal deregister msg err:", err)
               return nil, err
            }
            msg = dRegData
         }
         if regAddr == nil {
            regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{
               Key: handle.conf.regKey,
            })
         }
         var rMsg []bhomebus.Mesg
         n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n代表成功发送的节点的个数
         handle.printLog("regSock.SendandrecvTimeout n:", n)
         if n == 1 && len(rMsg) == 1 {
            var cr Reply
            if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
               handle.printLog("unmarshal regReply err:", err)
               return nil, errors.New("unmarshal regReply err:"+err.Error())
            } else {
               if cr.Success {
                  if rpd,err := json.Marshal(cr.Data);err ==nil {
                     var rr RegisterReply
                     if err = json.Unmarshal(rpd, &rr); err == nil {
                        regR = &rr
                        break loop
                     } else {
                        handle.printLog("unmarshal RegisterReply err:", err)
                     }
                  } else {
                     handle.printLog("marshal cr.Data err:", err)
                  }
               } else {
                  handle.printLog("cr:", cr)
               }
            }
         if bhsgo.Register(&procI, &regReply, handle.conf.sendTimeOut) {
            break loop
         } else {
            time.Sleep(1 * time.Second)
            time.Sleep(time.Second)
         }
      }
   }
   handle.printLog("register Reply:", *regR)
   for _, v := range ri.Channel {
      if k,ok := regR.ChannelKey[v];ok {
         s := bhomebus.OpenSocket()
         s.ForceBind(int(k))
         handle.m[v] = &sockServer{
            sock: s,
            info: &ri.Proc,
         }
      }
   }
   //维持心跳的socket
   sockHB := bhomebus.OpenSocket()
   handle.printLog("open sockHB")
   handle.sockHB = &sockClient{
      sock: sockHB,
      peer: int(regR.HeartbeatKey),
   }
   handle.wg = &sync.WaitGroup{}
   if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
      sockReply := bhomebus.OpenSocket()
      sockReply.ForceBind(int(regR.ReplyKey))
      handle.printLog("after pubTopic forceBind")
      handle.wg.Add(1)
      //serve server reply
      go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
      handle.sockRep = &sockServer{
         sock: sockReply,
         info: &ri.Proc,
      topics := bhome_msg.MsgTopicList{}
      var regTopicReply bhome_msg.MsgCommonReply
      for _,t := range ri.PubTopic {
         topics.TopicList = append(topics.TopicList, []byte(t))
      }
   loopRT:
      for {
         select {
         case <-q:
            handle.printLog("RegisterTopics recv quit signal")
            return nil, errors.New("RegisterTopics recv quit signal")
         default:
            if bhsgo.RegisterTopics(&topics, &regTopicReply, handle.conf.sendTimeOut) {
               handle.printLog("bhsgo.RegisterTopics success!!")
               break loopRT
            } else {
               time.Sleep(time.Second)
            }
         }
      }
      handle.wg.Add(1)
      go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
   }
   //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key
   sockPub := bhomebus.OpenSocket()
   handle.sockPub = &sockClient{
      sock: sockPub,
      peer: -1,
   }
   handle.printLog("register done!" )
   //有订阅消息才需要启动协程接收消息
   if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
      //订阅消息的socket
      sockSub := bhomebus.OpenSocket()
      //订阅所有主题
      handle.printLog("start Sub topics")
   if len(ri.SubTopic) > 0 {
      handle.printLog("sub topics")
      var subList bhome_msg.MsgTopicList
      for _,v := range ri.SubTopic {
         subN := sockSub.Sub(v)
         handle.printLog("subTopic:", v, " ret n:", subN)
         subList.TopicList = append(subList.TopicList, []byte(v))
      }
      //启动订阅信息接收
      handle.wg.Add(1)
      go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
      handle.sockSub = &sockClient{
         sock: sockSub,
         peer: -1,
      var subReply bhome_msg.MsgCommonReply
      if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
         handle.printLog("bhsgo.Subscribe ret false")
      }
   }
   sockWorker := bhomebus.OpenSocket()
   handle.sockWorker = &sockClient{
      sock: sockWorker,
      peer: int(regR.QueryTopicKey),
   if len(ri.SubNetTopic) > 0 {
      handle.printLog("sub net topics")
      var subNetList bhome_msg.MsgTopicList
      for _,v := range ri.SubNetTopic {
         subNetList.TopicList = append(subNetList.TopicList, []byte(v))
      }
      var subNetReply bhome_msg.MsgCommonReply
      if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) {
         handle.printLog("bhsgo.SubscribeNet ret false")
      }
   }
   if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 {
      //启动订阅信息接收
      handle.wg.Add(1)
      go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
   }
   return handle, nil
}
func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
   var procId string
   var msg bhome_msg.MsgPublish
   for {
      select {
      case <-ctx.Done():
         logFn("recvSubRoutine ctx.Done")
         wg.Done()
         return
      default:
         if bhsgo.ReadSub(&procId, &msg, 100) {
            ch <- msg
            logFn("ReadSub topic:", string(msg.Topic), " len(data):", len(msg.Data))
            procId = ""
            msg.Reset()
         } else {
            //time.Sleep(100 * time.Millisecond)
         }
      }
   }
}
//DeRegister
func (h *BHBus) DeRegister(dri *RegisterInfo) error {
   data, err := json.Marshal(*dri)
   if err != nil {
      return err
   }
   dRegData,err := json.Marshal(MsgInfo{
      MsgType: "",
      Topic: TOPIC_DEREGISTER,
      Body: data,
   })
   if err != nil {
      return err
   h.printLog("DeRegister")
   req := bhome_msg.ProcInfo{
      ProcId: []byte(h.ri.Proc.ID),
      Name: []byte(h.ri.Proc.Name),
   }
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: h.conf.regKey,
   })
   var retMsg []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut)
   if n == 0 {
      return nil
   reply := bhome_msg.MsgCommonReply{}
   if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) {
      h.printLog("Unregister false! ")
      return errors.New("Unregister false! ")
   }
   h.printLog("DeRegister retMsg:", retMsg)
   return fmt.Errorf("DeRegister n:%d", n)
   return nil
}
func (h *BHBus) printLog(v ...interface{}) {
@@ -297,76 +212,29 @@
func (h *BHBus) Free() {
   h.printLog("call BHBus free")
   h.wg.Wait()
   bhsgo.Cleanup()
   h.printLog("h.wg.Wait done")
   for _,v := range h.m {
      v.sock.Close()
   }
   if h.sockRep != nil {
      h.sockRep.sock.Close()
      h.sockRep = nil
   }
   if h.sockHB != nil {
      h.sockHB.sock.Close()
      h.sockHB = nil
   }
   if h.sockPub != nil {
      h.sockPub.sock.Close()
      h.sockPub = nil
   }
   if h.sockSub != nil {
      h.sockSub.sock.Close()
      h.sockSub = nil
   }
   if h.sockWorker != nil {
      h.sockWorker.sock.Close()
      h.sockWorker = nil
   }
   h.printLog("BHBus Freed")
}
//HeartBeat send
func (h *BHBus) HeartBeat(info *HeartBeatInfo) error {
   data, err := json.Marshal(*info)
   if err == nil {
      hbd,err := json.Marshal(MsgInfo{
         SrcProc: info.Proc,
         MsgType: MesgType_ReqRep,
         Topic:   TOPIC_HEARTBEAT,
         Body:    data,
      })
      if err != nil {
         h.printLog("marshal heartbeat msgInfo err:", err)
         return err
      }
      var rMsg []bhomebus.Mesg
      hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{
         Key: h.sockHB.peer,
      })
      //h.printLog("start send heartbeat")
      n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n代表成功发送的节点的个数
      //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
      if n > 0 {
         return nil
      } else {
         return fmt.Errorf("sockHB Sendandrecv ret n:%d", n)
      }
func (h *BHBus) HeartBeat() error {
   procI := bhome_msg.ProcInfo{
      ProcId: []byte(h.ri.Proc.ID),
      Name: []byte(h.ri.Proc.Name),
   }
   return err
   var ret bhome_msg.MsgCommonReply
   if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
      return nil
   } else {
      return errors.New("send heartBeat return false")
   }
}
//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error {
//   n := s.sock.SendtoTimeout(data, s.peer, timeout)
//   if n == 0 {
//      return nil
//   }
//   return errors.New("SendtoTimeout n:"+strconv.Itoa(n))
//}
//更新主题列表
func (h *BHBus)  UpdateNodeTopics(arr []NodeInfo) {
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
   h.mtxNode.Lock()
   defer h.mtxNode.Unlock()
   h.nodes = arr
@@ -375,174 +243,120 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
   h.mtxNode.Lock()
   defer h.mtxNode.Unlock()
   var nodes []bhomebus.NetNode
   reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: h.sockWorker.peer,
   })
   reqD,err := json.Marshal(MsgInfo{
      SrcProc: *srcProc,
      MsgType: MesgType_ReqRep,
      Topic:   TOPIC_QUERYKEY,
      Body:    []byte(topic),
   })
   if err != nil {
      return nil, fmt.Errorf("marshal req err:%s", err.Error())
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) {
   dest := bhome_msg.BHAddress{}
   reqTopic := bhome_msg.MsgQueryTopic{
      Topic: []byte(topic),
   }
   var ret []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
   if n > 0 {
      var reply Reply
      err = json.Unmarshal(ret[0].Data, &reply)
      if err != nil {
         h.printLog("unmarshal err:", err)
         return nil, err
      }
      if reply.Success {
         rd,err := json.Marshal(reply.Data)
         if err == nil {
            err = json.Unmarshal(rd, &nodes)
            if err == nil {
               return nodes, nil
            } else {
               h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
               return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
            }
         } else {
            return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
         }
      } else {
         h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
         return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
      }
   } else {
      return nil,   fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
   rep := bhome_msg.MsgQueryTopicReply{}
   if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) {
      return rep.NodeAddress, nil
   }
   if rep.Errmsg != nil {
      h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString))
      return nil, errors.New(string(rep.Errmsg.ErrString))
   }
   return nil, errors.New("bhsgo.QueryTopicAddress ret false")
}
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
   //1.首先需要通过topic拿到本机对应的NetNode
   rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
   h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err)
   if err != nil {
      return nil, err
   }
   //2.将请求返送到对应的server,并等待返回值
   data, err := json.Marshal(*req)
   h.printLog("marshal(*req) err:", err)
   if err != nil {
      return nil, err
   }
   var ret []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
   if n > 0 && len(ret) > 0 {
      var resp Reply
      if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
         return &resp, nil
      } else {
         h.printLog("unmarshal ret[0].Data err:", err)
         return nil, err
   pid := ""
   mrt := bhome_msg.MsgRequestTopicReply{}
   dest := bhome_msg.BHAddress{}
   if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
      var reply Reply
      if err := json.Unmarshal(mrt.Data, &reply); err != nil {
         h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data))
         return nil,err
      }
      return &reply, nil
   } else {
      h.printLog("Request n: ", n, " len(ret): ", len(ret))
      i, s := bhsgo.GetLastError()
      h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic))
      return nil, errors.New("request ")
   }
   return nil, fmt.Errorf("request err")
}
func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
   var ret []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
   if n > 0 && len(ret) > 0 {
      return ret[0].Data, nil
   } else {
      h.printLog("Request n: ", n, " len(ret): ", len(ret))
func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) {
   dest := bhome_msg.BHAddress{}
   if destArr != nil && len(destArr) > 0 {
      if destArr[0].Addr != nil {
         dest = *(destArr[0].Addr)
      }
   }
   return nil, fmt.Errorf("request err")
   pid := ""
   r := bhome_msg.MsgRequestTopicReply{}
   if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
      return r.Data, nil
   } else {
      i, s := bhsgo.GetLastError()
      h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest)
      return nil, errors.New("bhsgo.Request return false")
   }
}
func (h *BHBus) Reply(replyKey int, i *Reply) error {
   data,err := json.Marshal(*i)
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
   defer func() {
      i = nil
   }()
   data,err := sonic.Marshal(i)
   if err != nil {
      return err
   }
   n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
   h.printLog("reply to key:", replyKey, " n:",n)
   if n != 0 {
      return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
   rep := bhome_msg.MsgRequestTopicReply{
      Data: data,
   }
   return nil
   if bhsgo.SendReply(src, &rep) {
      return nil
   }
   return errors.New("reply return false")
}
//只发送请求,不需要应答.
//暴露在上层的,只有topic,没有key。
func (h *BHBus) SendOnly(key int, arg *MsgInfo) error {
   data,err := json.Marshal(*arg)
   if err != nil {
      return err
func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) {
   dest := &bhome_msg.BHAddress{}
   topic := &bhome_msg.MsgQueryProc{}
   rep := &bhome_msg.MsgQueryProcReply{}
   if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) {
      return rep.ProcList, nil
   } else {
      return nil, errors.New("QueryProcs ret flase")
   }
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
   if n != 0 {
      return fmt.Errorf("sendOnly ret n:%d", n)
   }
   return nil
}
func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
   data, err := json.Marshal(*req)
   if err != nil {
      return nil, err
   }
   rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: KEY_QUERY,
   })
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   var ret []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
   h.printLog("requestCenter n:", n, "len(ret):", len(ret))
   if n > 0 && len(ret) > 0{
      var cr Reply
      if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
         return &cr, nil
      } else {
         h.printLog("unmarshal to CommonReply err:", err)
      }
   }
   return nil, fmt.Errorf("request center err")
}
//向主题通道中发布消息
func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
   data,err := json.Marshal(*msg)
   if err == nil {
      if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
         return nil
      } else {
         return fmt.Errorf("pub err n:%d", n)
      }
func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
   if bhsgo.Publish(msg, h.conf.pubTimeOut) {
      return nil
   } else {
      return fmt.Errorf("pub err ")
   }
}
   return err
func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int {
   if bhsgo.Publish(msg, timeout) {
      return 1
   }
   return -1
}
//追加订阅的主题消息
func (h *BHBus) Sub(topics []string) {
   if topics != nil {
      for _,t := range topics {
         h.sockSub.sock.Sub(t)
   if topics != nil && len(topics) >0 {
      var subList bhome_msg.MsgTopicList
      for _, v := range topics {
         subList.TopicList = append(subList.TopicList, []byte(v))
      }
      var subReply bhome_msg.MsgCommonReply
      if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) {
         h.printLog("sub topics")
      }
   }
}
@@ -550,28 +364,6 @@
//注销订阅的主题
func (h *BHBus) DeSub(topics []string) {
   if topics != nil {
      for _,t := range topics {
         if n := h.sockSub.sock.Desub(t); n != 0 {
            h.printLog("DeSub topic:", t, " n:", n)
         }
      }
   }
}
//获取sub 或者需要reply的消息
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
   if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
      return nil,nil, -1
   }
   if len(h.chSub) >0 {
      m  := <-h.chSub
      subMsg = m.info
   }
   if len(h.chReply) >0 {
      m := <-h.chReply
      replyMsg = m.info
      replyKey = m.port
   }
   return
}