| | |
| | | */ |
| | | type Handle struct { |
| | | ctx context.Context |
| | | wg *sync.WaitGroup |
| | | // 创建channel对应的reply,等待读取其中的内容,server |
| | | // 其中必须有一个作为Request函数的server |
| | | m map[string]*sockServer |
| | |
| | | chReply chan TransInfo |
| | | } |
| | | |
| | | func garbageCollect(ctx context.Context, h *Handle) { |
| | | |
| | | <-ctx.Done() |
| | | |
| | | for _, v := range h.m { |
| | | v.sock.Close() |
| | | } |
| | | h.sockHB.sock.Close() |
| | | h.sockPub.sock.Close() |
| | | h.sockSub.sock.Close() |
| | | h.sockRep.sock.Close() |
| | | h.sockWorker.sock.Close() |
| | | } |
| | | |
| | | func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) { |
| | | func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | wg.Done() |
| | | return |
| | | default: |
| | | if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil { |
| | |
| | | } |
| | | } |
| | | |
| | | wg := &sync.WaitGroup{} |
| | | |
| | | chSize := 5 |
| | | chSub := make(chan TransInfo, chSize) |
| | | chReply := make(chan TransInfo, chSize) |
| | |
| | | sockReply := OpenDgramSocket() |
| | | sockReply.Bind(int(regReply.ReplyKey)) |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockReply, chSub) |
| | | |
| | | wg.Add(1) |
| | | go recvRoutine(ctx, sockReply, wg, chSub) |
| | | repS := &sockServer{ |
| | | sock: sockReply, |
| | | info: info.ProcInfo, |
| | |
| | | sockSub.Sub(v, int(regReply.SubTopicKey)) |
| | | } |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockSub, chSub) |
| | | wg.Add(1) |
| | | go recvRoutine(ctx, sockSub, wg, chSub) |
| | | subC := &sockClient{ |
| | | sock: sockSub, |
| | | peer: -1, |
| | |
| | | } |
| | | handle := &Handle{ |
| | | ctx: ctx, |
| | | wg: wg, |
| | | m: m, |
| | | sockHB: hbC, |
| | | sockPub: pubC, |
| | |
| | | chReply: chReply, |
| | | } |
| | | |
| | | go garbageCollect(ctx, handle) |
| | | |
| | | return handle |
| | | } |
| | | |
| | | // Free free |
| | | func (h *Handle) Free() { |
| | | h.wg.Wait() |
| | | |
| | | for _, v := range h.m { |
| | | v.sock.Close() |
| | | } |
| | | h.sockHB.sock.Close() |
| | | h.sockHB = nil |
| | | h.sockPub.sock.Close() |
| | | h.sockPub = nil |
| | | h.sockSub.sock.Close() |
| | | h.sockSub = nil |
| | | h.sockRep.sock.Close() |
| | | h.sockRep = nil |
| | | h.sockWorker.sock.Close() |
| | | h.sockWorker = nil |
| | | |
| | | fmt.Println("Handle Safe Free") |
| | | } |
| | | |
| | | const ( |
| | |
| | | |
| | | // GetMesg get mesg for sub or reply |
| | | func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { |
| | | if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { |
| | | return nil, nil, -1 |
| | | } |
| | | |
| | | if len(h.chSub) > 1 { |
| | | m := <-h.chSub |
| | | subMsg = m.info |