reid from https://github.com/michuanhaohao/reid-strong-baseline
zhangmeng
2020-02-25 0786441ed1828c411a16d6648baee753a02a3ddb
rpc/recv.go
@@ -1,123 +1,123 @@
package rpc
import (
   "context"
    "context"
   "time"
    "time"
   "basic.com/valib/deliver.git"
    "basic.com/valib/deliver.git"
)
// Reciever recv from ipc
type Reciever struct {
   ctx    context.Context
   ipcURL string
   out    chan<- []byte
    ctx    context.Context
    ipcURL string
    out    chan<- []byte
   shm      bool
   fnLogger func(...interface{})
    shm      bool
    fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
   return &Reciever{
      ipcURL:   url,
      out:      out,
      shm:      shm,
      fnLogger: fn,
   }
    return &Reciever{
        ipcURL:   url,
        out:      out,
        shm:      shm,
        fnLogger: fn,
    }
}
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
   if r.shm {
      r.runShm(ctx)
   } else {
      r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
   }
    if r.shm {
        r.runShm(ctx)
    } else {
        r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
    }
}
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
   count := 0
    count := 0
   for {
      select {
      case <-ctx.Done():
         i.Close()
         return
      default:
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        default:
         if r.shm {
            if d, err := i.Recv(); err != nil {
               i.Close()
               r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
            if r.shm {
                if d, err := i.Recv(); err != nil {
                    i.Close()
                    r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
               c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
            loopR:
               for {
                  select {
                  case <-ctx.Done():
                     return
                  default:
                     if err == nil {
                        break loopR
                     }
                     time.Sleep(time.Second)
                     c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
                     r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, "  FAILED : ", err)
                  }
               }
               i = c
               r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
            } else {
               if d != nil {
                  count++
                  if count > 10 {
                     count = 0
                     r.fnLogger("~~~shm recv image:", len(d))
                  }
                  if len(d) > 2 {
                     r.out <- d
                  }
               }
            }
         } else {
            if msg, err := i.Recv(); err != nil {
               // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
            } else {
               count++
               if count > 10 {
                  count = 0
                  r.fnLogger("~~~mangos recv image:", len(msg))
               }
               if len(msg) > 2 {
                  r.out <- msg
               }
            }
         }
         time.Sleep(10 * time.Millisecond)
      }
   }
                    c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
                loopR:
                    for {
                        select {
                        case <-ctx.Done():
                            return
                        default:
                            if err == nil {
                                break loopR
                            }
                            time.Sleep(time.Second)
                            c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
                            r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, "  FAILED : ", err)
                        }
                    }
                    i = c
                    r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
                } else {
                    if d != nil {
                        count++
                        if count > 10 {
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        }
                        if len(d) > 2 {
                            r.out <- d
                        }
                    }
                }
            } else {
                if msg, err := i.Recv(); err != nil {
                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
                } else {
                    count++
                    if count > 10 {
                        count = 0
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    if len(msg) > 2 {
                        r.out <- msg
                    }
                }
            }
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (r *Reciever) runShm(ctx context.Context) {
   c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
    c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
loopRBegin:
   for {
      select {
      case <-ctx.Done():
         return
      default:
         if err == nil {
            break loopRBegin
         }
         time.Sleep(1 * time.Second)
         c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
         r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
      }
   }
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err == nil {
                break loopRBegin
            }
            time.Sleep(1 * time.Second)
            c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
            r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
        }
    }
   r.run(ctx, c)
    r.run(ctx, c)
}