reid from https://github.com/michuanhaohao/reid-strong-baseline
zhangmeng
2020-02-25 0786441ed1828c411a16d6648baee753a02a3ddb
rpc/send.go
@@ -1,112 +1,112 @@
package rpc
import (
   "context"
   "time"
    "context"
    "time"
   "basic.com/valib/deliver.git"
    "basic.com/valib/deliver.git"
)
// Sender decoder ingo
type Sender struct {
   ipcURL string
   in     <-chan []byte
   shm    bool
    ipcURL string
    in     <-chan []byte
    shm    bool
   fnLogger func(...interface{})
    fnLogger func(...interface{})
}
// NewSender Sender
func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
   return &Sender{
      ipcURL:   ipcURL,
      in:       in,
      shm:      shm,
      fnLogger: fn,
   }
    return &Sender{
        ipcURL:   ipcURL,
        in:       in,
        shm:      shm,
        fnLogger: fn,
    }
}
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
   if s.shm {
      s.runShm(ctx)
   } else {
      i := deliver.NewServer(deliver.PushPull, s.ipcURL)
      if i == nil {
         s.fnLogger("sender 2 pubsub nng create error")
         return
      }
      s.run(ctx, i)
   }
    if s.shm {
        s.runShm(ctx)
    } else {
        i := deliver.NewServer(deliver.PushPull, s.ipcURL)
        if i == nil {
            s.fnLogger("sender 2 pubsub nng create error")
            return
        }
        s.run(ctx, i)
    }
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
   for {
      select {
      case <-ctx.Done():
         i.Close()
         return
      case d := <-s.in:
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        case d := <-s.in:
         if s.shm {
            if err := i.Send(d); err != nil {
               i.Close()
               s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
            if s.shm {
                if err := i.Send(d); err != nil {
                    i.Close()
                    s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
               c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
            loopS:
               for {
                  select {
                  case <-ctx.Done():
                     return
                  default:
                     if err == nil {
                        break loopS
                     }
                     time.Sleep(time.Second)
                     c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
                     s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
                  }
                    c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
                loopS:
                    for {
                        select {
                        case <-ctx.Done():
                            return
                        default:
                            if err == nil {
                                break loopS
                            }
                            time.Sleep(time.Second)
                            c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
                            s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
                        }
               }
                    }
               i = c
               s.fnLogger("Sender Create Shm:", s.ipcURL)
            } else {
                    i = c
                    s.fnLogger("Sender Create Shm:", s.ipcURL)
                } else {
            }
         } else {
            err := i.Send(d)
            if err != nil {
               // logo.Errorln("error sender 2 pubsub: ", err)
            } else {
               s.fnLogger("mangos send to pubsub len: ", len(d))
            }
         }
      default:
         time.Sleep(10 * time.Millisecond)
      }
   }
                }
            } else {
                err := i.Send(d)
                if err != nil {
                    // logo.Errorln("error sender 2 pubsub: ", err)
                } else {
                    s.fnLogger("mangos send to pubsub len: ", len(d))
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (s *Sender) runShm(ctx context.Context) {
   c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
    c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
loopSBegin:
   for {
      select {
      case <-ctx.Done():
         return
      default:
         if err == nil {
            break loopSBegin
         }
         time.Sleep(1 * time.Second)
         c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
         s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
      }
   }
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if err == nil {
                break loopSBegin
            }
            time.Sleep(1 * time.Second)
            c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
            s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
        }
    }
   s.run(ctx, c)
    s.run(ctx, c)
}