liuxiaolong
2021-02-07 6f7957c42c409624ca7a05c54bd35752f996ba68
hbusc.go
@@ -7,6 +7,7 @@
   "errors"
   "fmt"
   "os"
   "strconv"
   "sync"
   "time"
)
@@ -50,6 +51,7 @@
   //mtxWorker    sync.Mutex    //SendAndRecv可能不是线程安全的
   chSub chan TransInfo
   chReply chan TransInfo
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
@@ -84,23 +86,23 @@
   }
}
func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
   for {
      select {
      case <-ctx.Done():
         logFn("recvandsendRoutine ctx.Done")
         wg.Done()
         return
      default:
         n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
         if n != 0 {
            logFn("RecvandsendTimeout success")
         } else {
            //time.Sleep(100 * time.Millisecond)
         }
      }
   }
}
//func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
//   for {
//      select {
//      case <-ctx.Done():
//         logFn("recvandsendRoutine ctx.Done")
//         wg.Done()
//         return
//      default:
//         n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
//         if n != 0 {
//            logFn("RecvandsendTimeout success")
//         } else {
//            //time.Sleep(100 * time.Millisecond)
//         }
//      }
//   }
//}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
@@ -109,6 +111,7 @@
      conf: config,
      m: make(map[string]*sockServer),
      chSub: make(chan TransInfo, config.chSize),
      chReply: make(chan TransInfo, config.chSize),
   }
   var err error
@@ -227,9 +230,9 @@
      sockReply := bhomebus.OpenSocket()
      sockReply.ForceBind(int(regR.ReplyKey))
      handle.printLog("after pubTopic forceBind")
      //handle.wg.Add(1)
      handle.wg.Add(1)
      //serve server reply
      //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
      go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
      handle.sockRep = &sockServer{
         sock: sockReply,
         info: &ri.Proc,
@@ -484,19 +487,19 @@
   return nil, fmt.Errorf("request err")
}
//func (h *BHBus) Reply(replyKey int, i *Reply) error {
//   data,err := json.Marshal(*i)
//   if err != nil {
//      return err
//   }
//
//   n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
//   h.printLog("reply to key:", replyKey, " n:",n)
//   if n != 0 {
//      return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
//   }
//   return nil
//}
func (h *BHBus) Reply(replyKey int, i *Reply) error {
   data,err := json.Marshal(*i)
   if err != nil {
      return err
   }
   n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
   h.printLog("reply to key:", replyKey, " n:",n)
   if n != 0 {
      return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
   }
   return nil
}
//只发送请求,不需要应答.
@@ -584,13 +587,30 @@
//获取sub 或者需要reply的消息
func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
   if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
      return nil
//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
//   if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
//      return nil
//   }
//   if len(h.chSub) >0 {
//      m  := <-h.chSub
//      subMsg = m.info
//   }
//   return
//}
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
   if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
      return nil, nil, -1
   }
   if len(h.chSub) >0 {
      m  := <-h.chSub
      subMsg = m.info
   }
   if len(h.chReply) >0 {
      m := <-h.chReply
      replyMsg = m.info
      replyKey = m.port
   }
   return
}