zhangmeng
2020-08-03 fc639da8d408be5dbff96c008ec3936ef683f565
library.go
@@ -42,6 +42,7 @@
*/
type Handle struct {
   ctx context.Context
   wg  *sync.WaitGroup
   // 创建channel对应的reply,等待读取其中的内容,server
   // 其中必须有一个作为Request函数的server
   m map[string]*sockServer
@@ -65,24 +66,11 @@
   chReply chan TransInfo
}
func garbageCollect(ctx context.Context, h *Handle) {
   <-ctx.Done()
   for _, v := range h.m {
      v.sock.Close()
   }
   h.sockHB.sock.Close()
   h.sockPub.sock.Close()
   h.sockSub.sock.Close()
   h.sockRep.sock.Close()
   h.sockWorker.sock.Close()
}
func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo) {
   for {
      select {
      case <-ctx.Done():
         wg.Done()
         return
      default:
         if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
@@ -150,6 +138,8 @@
      }
   }
   wg := &sync.WaitGroup{}
   chSize := 5
   chSub := make(chan TransInfo, chSize)
   chReply := make(chan TransInfo, chSize)
@@ -158,7 +148,9 @@
   sockReply := OpenDgramSocket()
   sockReply.Bind(int(regReply.ReplyKey))
   // 启动接收线程
   go recvRoutine(ctx, sockReply, chSub)
   wg.Add(1)
   go recvRoutine(ctx, sockReply, wg, chSub)
   repS := &sockServer{
      sock: sockReply,
      info: info.ProcInfo,
@@ -184,7 +176,8 @@
      sockSub.Sub(v, int(regReply.SubTopicKey))
   }
   // 启动接收线程
   go recvRoutine(ctx, sockSub, chSub)
   wg.Add(1)
   go recvRoutine(ctx, sockSub, wg, chSub)
   subC := &sockClient{
      sock: sockSub,
      peer: -1,
@@ -198,6 +191,7 @@
   }
   handle := &Handle{
      ctx:        ctx,
      wg:         wg,
      m:          m,
      sockHB:     hbC,
      sockPub:    pubC,
@@ -208,9 +202,28 @@
      chReply:    chReply,
   }
   go garbageCollect(ctx, handle)
   return handle
}
// Free free
func (h *Handle) Free() {
   h.wg.Wait()
   for _, v := range h.m {
      v.sock.Close()
   }
   h.sockHB.sock.Close()
   h.sockHB = nil
   h.sockPub.sock.Close()
   h.sockPub = nil
   h.sockSub.sock.Close()
   h.sockSub = nil
   h.sockRep.sock.Close()
   h.sockRep = nil
   h.sockWorker.sock.Close()
   h.sockWorker = nil
   fmt.Println("Handle Safe Free")
}
const (
@@ -323,6 +336,10 @@
// GetMesg get mesg for sub or reply
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
   if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
      return nil, nil, -1
   }
   if len(h.chSub) > 1 {
      m := <-h.chSub
      subMsg = m.info