| | |
| | | } |
| | | |
| | | //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 |
| | | func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo) { |
| | | func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) { |
| | | var data []byte |
| | | var key int |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logFn("recvRoutine ctx.Done") |
| | | wg.Done() |
| | | return |
| | | default: |
| | |
| | | data = []byte{} |
| | | key = 0 |
| | | } |
| | | } else { |
| | | time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | |
| | | sockReply.ForceBind(int(regR.ReplyKey)) |
| | | handle.wg.Add(1) |
| | | //serve server reply |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply) |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | handle.sockRep = &sockServer{ |
| | | sock: sockReply, |
| | | info: &ri.Proc, |
| | |
| | | |
| | | //启动订阅信息接收 |
| | | handle.wg.Add(1) |
| | | go recvRoutine(ctx, sockSub, handle.wg, handle.chSub) |
| | | go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) |
| | | handle.sockSub = &sockClient{ |
| | | sock: sockSub, |
| | | peer: -1, |