reid from https://github.com/michuanhaohao/reid-strong-baseline
554325746@qq.com
2020-03-24 495ffcdad0027be02d5fc82825e08f36b6a53b90
整理代码缩小收发缓存大小节省内存
2个文件已修改
209 ■■■■ 已修改文件
rpc/recv.go 107 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rpc/send.go 102 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rpc/recv.go
@@ -12,17 +12,20 @@
type Reciever struct {
    ctx    context.Context
    ipcURL string
    out    chan<- []byte
    out    chan []byte
    chCap  int
    shm      bool
    fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever {
    return &Reciever{
        ipcURL:   url,
        out:      out,
        ipcURL: url,
        out:    out,
        chCap:  cap(out),
        shm:      shm,
        fnLogger: fn,
    }
@@ -31,16 +34,9 @@
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
    if r.shm {
        r.runShm(ctx)
    } else {
        r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
    }
}
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
    count := 0
    i := r.createIPC(ctx, 50*time.Millisecond, 200)
    for {
        select {
@@ -50,39 +46,33 @@
        default:
            if r.shm {
                if i == nil {
                    r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL)
                    i = r.createIPC(ctx, 50*time.Millisecond, 10)
                }
                if i == nil {
                    r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL)
                    continue
                }
                if d, err := i.Recv(); err != nil {
                    i.Close()
                    r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
                    c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
                loopR:
                    for {
                        select {
                        case <-ctx.Done():
                            return
                        default:
                            if err == nil {
                                break loopR
                            }
                            time.Sleep(time.Second)
                            c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
                            r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, "  FAILED : ", err)
                        }
                    }
                    i = c
                    r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
                    i.Close()
                    r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err)
                    i = r.createIPC(ctx, time.Hour, -1)
                    r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL)
                } else {
                    if d != nil {
                        count++
                        if count > 10 {
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        if len(r.out) > r.chCap/2 {
                            for i := 0; i < r.chCap/2; i++ {
                                <-r.out
                            }
                        }
                        if len(d) > 2 {
                            r.out <- d
                        }
                        r.out <- d
                        r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d))
                    }
                }
            } else {
                if msg, err := i.Recv(); err != nil {
                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
@@ -93,6 +83,11 @@
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    if len(msg) > 2 {
                        if len(r.out) > r.chCap/2 {
                            for i := 0; i < r.chCap/2; i++ {
                                <-r.out
                            }
                        }
                        r.out <- msg
                    }
                }
@@ -102,22 +97,36 @@
    }
}
func (r *Reciever) runShm(ctx context.Context) {
    c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
loopRBegin:
func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
    mode := deliver.PushPull
    if r.shm {
        mode = deliver.Shm
    }
    try := 0
    c, err := deliver.NewClientWithError(mode, r.ipcURL)
loopR:
    for {
        select {
        case <-ctx.Done():
            return
            return nil
        default:
            if err == nil {
                break loopRBegin
                break loopR
            }
            time.Sleep(1 * time.Second)
            c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
            r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
            if loop > 0 {
                try++
                if try > loop {
                    return nil
                }
                time.Sleep(wait)
            } else {
                time.Sleep(time.Second)
            }
            c, err = deliver.NewClientWithError(mode, r.ipcURL)
        }
    }
    r.run(ctx, c)
    return c
}
rpc/send.go
@@ -11,16 +11,19 @@
type Sender struct {
    ipcURL string
    in     <-chan []byte
    shm    bool
    chCap  int
    shm      bool
    fnLogger func(...interface{})
}
// NewSender Sender
func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
    return &Sender{
        ipcURL:   ipcURL,
        in:       in,
        ipcURL: ipcURL,
        in:     in,
        chCap:  cap(in),
        shm:      shm,
        fnLogger: fn,
    }
@@ -29,19 +32,7 @@
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
    if s.shm {
        s.runShm(ctx)
    } else {
        i := deliver.NewServer(deliver.PushPull, s.ipcURL)
        if i == nil {
            s.fnLogger("sender 2 pubsub nng create error")
            return
        }
        s.run(ctx, i)
    }
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
    i := s.createIPC(ctx, 50*time.Millisecond, 200)
    for {
        select {
@@ -50,33 +41,38 @@
            return
        case d := <-s.in:
            if len(s.in) > s.chCap/2 {
                for i := 0; i < s.chCap/2; i++ {
                    <-s.in
                }
            }
            if len(s.in) > 0 {
                d = <-s.in
            }
            if s.shm {
                if i == nil {
                    s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL)
                    i = s.createIPC(ctx, 50*time.Millisecond, 10)
                }
                if i == nil {
                    s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL)
                    continue
                }
                t := time.Now()
                if err := i.Send(d); err != nil {
                    i.Close()
                    s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
                    c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
                loopS:
                    for {
                        select {
                        case <-ctx.Done():
                            return
                        default:
                            if err == nil {
                                break loopS
                            }
                            time.Sleep(time.Second)
                            c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
                            s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
                        }
                    }
                    i = c
                    s.fnLogger("Sender Create Shm:", s.ipcURL)
                    i = s.createIPC(ctx, time.Hour, -1)
                    s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL)
                } else {
                    s.fnLogger("~~~~~~ shm send to reid len: ", len(d))
                }
                s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t))
            } else {
                err := i.Send(d)
                if err != nil {
@@ -91,22 +87,36 @@
    }
}
func (s *Sender) runShm(ctx context.Context) {
    c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
loopSBegin:
func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
    mode := deliver.PushPull
    if s.shm {
        mode = deliver.Shm
    }
    try := 0
    c, err := deliver.NewClientWithError(mode, s.ipcURL)
loopR:
    for {
        select {
        case <-ctx.Done():
            return
            return nil
        default:
            if err == nil {
                break loopSBegin
                break loopR
            }
            time.Sleep(1 * time.Second)
            c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
            s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
            if loop > 0 {
                try++
                if try > loop {
                    return nil
                }
                time.Sleep(wait)
            } else {
                time.Sleep(time.Second)
            }
            c, err = deliver.NewClientWithError(mode, s.ipcURL)
        }
    }
    s.run(ctx, c)
    return c
}