zhangmeng
2020-08-03 fc639da8d408be5dbff96c008ec3936ef683f565
add Free
1个文件已修改
55 ■■■■■ 已修改文件
library.go 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
library.go
@@ -42,6 +42,7 @@
*/
type Handle struct {
    ctx context.Context
    wg  *sync.WaitGroup
    // 创建channel对应的reply,等待读取其中的内容,server
    // 其中必须有一个作为Request函数的server
    m map[string]*sockServer
@@ -65,24 +66,11 @@
    chReply chan TransInfo
}
func garbageCollect(ctx context.Context, h *Handle) {
    <-ctx.Done()
    for _, v := range h.m {
        v.sock.Close()
    }
    h.sockHB.sock.Close()
    h.sockPub.sock.Close()
    h.sockSub.sock.Close()
    h.sockRep.sock.Close()
    h.sockWorker.sock.Close()
}
func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo) {
    for {
        select {
        case <-ctx.Done():
            wg.Done()
            return
        default:
            if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
@@ -150,6 +138,8 @@
        }
    }
    wg := &sync.WaitGroup{}
    chSize := 5
    chSub := make(chan TransInfo, chSize)
    chReply := make(chan TransInfo, chSize)
@@ -158,7 +148,9 @@
    sockReply := OpenDgramSocket()
    sockReply.Bind(int(regReply.ReplyKey))
    // 启动接收线程
    go recvRoutine(ctx, sockReply, chSub)
    wg.Add(1)
    go recvRoutine(ctx, sockReply, wg, chSub)
    repS := &sockServer{
        sock: sockReply,
        info: info.ProcInfo,
@@ -184,7 +176,8 @@
        sockSub.Sub(v, int(regReply.SubTopicKey))
    }
    // 启动接收线程
    go recvRoutine(ctx, sockSub, chSub)
    wg.Add(1)
    go recvRoutine(ctx, sockSub, wg, chSub)
    subC := &sockClient{
        sock: sockSub,
        peer: -1,
@@ -198,6 +191,7 @@
    }
    handle := &Handle{
        ctx:        ctx,
        wg:         wg,
        m:          m,
        sockHB:     hbC,
        sockPub:    pubC,
@@ -208,9 +202,28 @@
        chReply:    chReply,
    }
    go garbageCollect(ctx, handle)
    return handle
}
// Free free
func (h *Handle) Free() {
    h.wg.Wait()
    for _, v := range h.m {
        v.sock.Close()
    }
    h.sockHB.sock.Close()
    h.sockHB = nil
    h.sockPub.sock.Close()
    h.sockPub = nil
    h.sockSub.sock.Close()
    h.sockSub = nil
    h.sockRep.sock.Close()
    h.sockRep = nil
    h.sockWorker.sock.Close()
    h.sockWorker = nil
    fmt.Println("Handle Safe Free")
}
const (
@@ -323,6 +336,10 @@
// GetMesg get mesg for sub or reply
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
    if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
        return nil, nil, -1
    }
    if len(h.chSub) > 1 {
        m := <-h.chSub
        subMsg = m.info