liuxiaolong
2021-02-07 abfd401180d1eb09c8ed1c24797c3e503e45fa08
hbusc.go
@@ -7,7 +7,6 @@
   "errors"
   "fmt"
   "os"
   "strconv"
   "sync"
   "time"
)
@@ -48,23 +47,24 @@
   sockSub *sockClient  //订阅主题的socket,线程实时接收消息,需要单独处理
   sockWorker    *sockClient  //发给任意的server,短暂的request client
   mtxWorker    sync.Mutex    //SendAndRecv可能不是线程安全的
   //mtxWorker    sync.Mutex    //SendAndRecv可能不是线程安全的
   chSub chan TransInfo
   chReply chan TransInfo
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo) {
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
   var data []byte
   var key int
   for {
      select {
      case <-ctx.Done():
         logFn("recvRoutine ctx.Done")
         wg.Done()
         return
      default:
         if n := s.RecvfromTimeout(&data, &key, 10);n == 0 {
         n := s.RecvfromTimeout(&data, &key, 1000) //目前10001返回值表示超时
         if n == 0 {
            var info MsgInfo
            if err := json.Unmarshal(data, &info);err == nil {
               ch <- TransInfo{
@@ -74,7 +74,29 @@
               data = []byte{}
               key = 0
            } else {
               logFn("unmarshal to MsgInfo err:", err)
            }
         } else {
            time.Sleep(100 * time.Millisecond)
         }
      }
   }
}
func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
   for {
      select {
      case <-ctx.Done():
         logFn("recvandsendRoutine ctx.Done")
         wg.Done()
         return
      default:
         n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
         if n != 0 {
            logFn("RecvandsendTimeout success")
         } else {
            //time.Sleep(100 * time.Millisecond)
         }
      }
   }
@@ -82,11 +104,11 @@
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
   handle := &BHBus{
   handle := &BHBus {
      ctx: ctx,
      conf: config,
      m: make(map[string]*sockServer),
      chSub: make(chan TransInfo, config.chSize),
      chReply: make(chan TransInfo, config.chSize),
   }
   var err error
@@ -148,30 +170,32 @@
         }
         var rMsg []bhomebus.Mesg
         n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n代表成功发送的节点的个数
         handle.printLog("regSock.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
         n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n代表成功发送的节点的个数
         handle.printLog("regSock.SendandrecvTimeout n:", n)
         if n == 1 && len(rMsg) == 1 {
            var cr CommonReply
            var cr Reply
            if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
               handle.printLog("unmarshal regReply err:", err)
               return nil, errors.New("unmarshal regReply err:"+err.Error())
            } else {
               if cr.Status == REPLY_SUCCESS {
                  var rr RegisterReply
                  if err = json.Unmarshal(cr.Body, &rr);err ==nil {
                     regR = &rr
                     break loop
               if cr.Success {
                  if rpd,err := json.Marshal(cr.Data);err ==nil {
                     var rr RegisterReply
                     if err = json.Unmarshal(rpd, &rr); err == nil {
                        regR = &rr
                        break loop
                     } else {
                        handle.printLog("unmarshal RegisterReply err:", err)
                     }
                  } else {
                     handle.printLog("unmarshal RegisterReply err:", err)
                     handle.printLog("marshal cr.Data err:", err)
                  }
               } else {
                  handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc)
                  handle.printLog("cr:", cr)
               }
            }
         } else {
            time.Sleep(100 * time.Millisecond)
            time.Sleep(1 * time.Second)
         }
      }
   }
@@ -181,7 +205,7 @@
   for _, v := range ri.Channel {
      if k,ok := regR.ChannelKey[v];ok {
         s := bhomebus.OpenSocket()
         s.Bind(int(k))
         s.ForceBind(int(k))
         handle.m[v] = &sockServer{
            sock: s,
            info: &ri.Proc,
@@ -189,24 +213,29 @@
      }
   }
   handle.wg = &sync.WaitGroup{}
   sockReply := bhomebus.OpenSocket()
   sockReply.Bind(int(regR.ReplyKey))
   handle.wg.Add(1)
   //serve server reply
   go recvRoutine(ctx, sockReply, handle.wg, handle.chReply)
   handle.sockRep = &sockServer{
      sock: sockReply,
      info: &ri.Proc,
   }
   //维持心跳的socket
   sockHB := bhomebus.OpenSocket()
   handle.printLog("open sockHB")
   handle.sockHB = &sockClient{
      sock: sockHB,
      peer: int(regR.HeartbeatKey),
   }
   handle.wg = &sync.WaitGroup{}
   if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
      sockReply := bhomebus.OpenSocket()
      sockReply.ForceBind(int(regR.ReplyKey))
      handle.printLog("after pubTopic forceBind")
      //handle.wg.Add(1)
      //serve server reply
      //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
      handle.sockRep = &sockServer{
         sock: sockReply,
         info: &ri.Proc,
      }
   }
   //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key
   sockPub := bhomebus.OpenSocket()
@@ -215,19 +244,24 @@
      peer: -1,
   }
   //订阅消息的socket
   sockSub := bhomebus.OpenSocket()
   //订阅所有主题
   for _,v := range ri.SubTopic {
      sockSub.Sub(v)
   }
   //有订阅消息才需要启动协程接收消息
   if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
      //订阅消息的socket
      sockSub := bhomebus.OpenSocket()
      //订阅所有主题
      handle.printLog("start Sub topics")
      for _,v := range ri.SubTopic {
         subN := sockSub.Sub(v)
         handle.printLog("subTopic:", v, " ret n:", subN)
      }
   //启动订阅信息接收
   handle.wg.Add(1)
   go recvRoutine(ctx, sockSub, handle.wg, handle.chSub)
   handle.sockSub = &sockClient{
      sock: sockSub,
      peer: -1,
      //启动订阅信息接收
      handle.wg.Add(1)
      go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
      handle.sockSub = &sockClient{
         sock: sockSub,
         peer: -1,
      }
   }
   sockWorker := bhomebus.OpenSocket()
@@ -255,8 +289,8 @@
      return err
   }
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   //h.mtxWorker.Lock()
   //defer h.mtxWorker.Unlock()
   netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: h.conf.regKey,
   })
@@ -277,7 +311,9 @@
//Release
func (h *BHBus) Free() {
   h.printLog("call BHBus free")
   h.wg.Wait()
   h.printLog("h.wg.Wait done")
   for _,v := range h.m {
      v.sock.Close()
   }
@@ -355,74 +391,112 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string, topic string) ([]bhomebus.NetNode,error) {
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
   h.mtxNode.Lock()
   defer h.mtxNode.Unlock()
   var nodes []bhomebus.NetNode
   if h.nodes != nil {
      for _,n := range h.nodes {
         if serverId != "" { //获取指定节点的
            if n.SvrInfo.ID == serverId {
               if k,ok := n.Topic2Key[topic];ok {
                  nodes = append(nodes, bhomebus.NetNode{
                     IPHost:n.SvrInfo.IP,
                     Port:n.SvrInfo.Port,
                     Key:k,
                  })
               }
            }
         } else { //获取所有节点的
            if k,ok := n.Topic2Key[topic];ok {
               nodes = append(nodes, bhomebus.NetNode{
                  IPHost:n.SvrInfo.IP,
                  Port:n.SvrInfo.Port,
                  Key:k,
               })
            }
         }
   reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: h.sockWorker.peer,
   })
   reqD,err := json.Marshal(MsgInfo{
      SrcProc: *srcProc,
      MsgType: MesgType_ReqRep,
      Topic:   TOPIC_QUERYKEY,
      Body:    []byte(topic),
   })
   if err != nil {
      return nil, fmt.Errorf("marshal req err:%s", err.Error())
   }
   var ret []bhomebus.Mesg
   n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
   if n > 0 {
      var reply Reply
      err = json.Unmarshal(ret[0].Data, &reply)
      if err != nil {
         h.printLog("unmarshal err:", err)
         return nil, err
      }
      if reply.Success {
         rd,err := json.Marshal(reply.Data)
         if err == nil {
            err = json.Unmarshal(rd, &nodes)
            if err == nil {
               return nodes, nil
            } else {
               h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
               return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
            }
         } else {
            return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
         }
      } else {
         h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
         return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
      }
   } else {
      return nil,   fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
   }
   if len(nodes) == 0 {
      return nil,fmt.Errorf("topic not found in nodes")
   }
   return nodes, nil
}
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
   //1.首先需要通过topic拿到本机对应的NetNode
   rNodes, err := h.GetNetNodeByTopic(serverId, req.Topic)
   rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
   h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err)
   if err != nil {
      return nil, err
   }
   //2.将请求返送到对应的server,并等待返回值
   data, err := json.Marshal(*req)
   h.printLog("marshal(*req) err:", err)
   if err != nil {
      return nil, err
   }
   var ret []bhomebus.Mesg
   if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs);n == 0 {
      if len(ret) > 0 {
         if err = json.Unmarshal(ret[0].Data, resp); err == nil {
            return resp, nil
         }
   n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
   if n > 0 && len(ret) > 0 {
      var resp Reply
      if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
         return &resp, nil
      } else {
         h.printLog("unmarshal ret[0].Data err:", err)
         return nil, err
      }
   } else {
      h.printLog("Request n: ", n, " len(ret): ", len(ret))
   }
   return nil, fmt.Errorf("request err")
}
func (h *BHBus) Reply(replyKey int, i MsgInfo) error {
   data,err := json.Marshal(i)
   if err != nil {
      return err
   }
func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
   var ret []bhomebus.Mesg
   n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
   if n != 0 {
      return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
   n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
   if n > 0 && len(ret) > 0 {
      return ret[0].Data, nil
   } else {
      h.printLog("Request n: ", n, " len(ret): ", len(ret))
   }
   return nil
   return nil, fmt.Errorf("request err")
}
//func (h *BHBus) Reply(replyKey int, i *Reply) error {
//   data,err := json.Marshal(*i)
//   if err != nil {
//      return err
//   }
//
//   n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
//   h.printLog("reply to key:", replyKey, " n:",n)
//   if n != 0 {
//      return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
//   }
//   return nil
//}
//只发送请求,不需要应答.
@@ -432,8 +506,8 @@
   if err != nil {
      return err
   }
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   //h.mtxWorker.Lock()
   //defer h.mtxWorker.Unlock()
   n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
   if n != 0 {
      return fmt.Errorf("sendOnly ret n:%d", n)
@@ -441,7 +515,7 @@
   return nil
}
func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) {
func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
   data, err := json.Marshal(*req)
   if err != nil {
      return nil, err
@@ -449,15 +523,17 @@
   rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: KEY_QUERY,
   })
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   //h.mtxWorker.Lock()
   //defer h.mtxWorker.Unlock()
   var ret []bhomebus.Mesg
   if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut);n == 0 {
      if len(ret) > 0 {
         var cr *CommonReply
         if err = json.Unmarshal(ret[0].Data, cr); err == nil {
            return cr, nil
         }
   n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
   h.printLog("requestCenter n:", n, "len(ret):", len(ret))
   if n > 0 && len(ret) > 0{
      var cr Reply
      if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
         return &cr, nil
      } else {
         h.printLog("unmarshal to CommonReply err:", err)
      }
   }
   return nil, fmt.Errorf("request center err")
@@ -468,7 +544,7 @@
func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
   data,err := json.Marshal(*msg)
   if err == nil {
      if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n == 0 {
      if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
         return nil
      } else {
         return fmt.Errorf("pub err n:%d", n)
@@ -476,6 +552,14 @@
   }
   return err
}
func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
   data,err := json.Marshal(*msg)
   if err == nil {
      return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
   }
   return -1
}
//追加订阅的主题消息
@@ -500,18 +584,13 @@
//获取sub 或者需要reply的消息
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
   if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
      return nil,nil, -1
func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
   if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
      return nil
   }
   if len(h.chSub) >1 {
   if len(h.chSub) >0 {
      m  := <-h.chSub
      subMsg = m.info
   }
   if len(h.chReply) > 1 {
      m := <-h.chReply
      replyMsg = m.info
      replyKey = m.port
   }
   return
}