liuxiaolong
2021-01-06 c4e51a45643377624f80829066d99008c6623458
hbusc.go
@@ -55,12 +55,13 @@
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo) {
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
   var data []byte
   var key int
   for {
      select {
      case <-ctx.Done():
         logFn("recvRoutine ctx.Done")
         wg.Done()
         return
      default:
@@ -75,6 +76,8 @@
               data = []byte{}
               key = 0
            }
         } else {
            time.Sleep(10 * time.Millisecond)
         }
      }
   }
@@ -195,7 +198,7 @@
   sockReply.ForceBind(int(regR.ReplyKey))
   handle.wg.Add(1)
   //serve server reply
   go recvRoutine(ctx, sockReply, handle.wg, handle.chReply)
   go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
   handle.sockRep = &sockServer{
      sock: sockReply,
      info: &ri.Proc,
@@ -224,7 +227,7 @@
   //启动订阅信息接收
   handle.wg.Add(1)
   go recvRoutine(ctx, sockSub, handle.wg, handle.chSub)
   go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
   handle.sockSub = &sockClient{
      sock: sockSub,
      peer: -1,