zhangmeng
2019-05-17 22125ca10867152617cc4f42f403a0f6e37648a4
main.go
@@ -4,114 +4,52 @@
   "demo/deliver"
   "fmt"
   "os"
   "os/signal"
   "strconv"
   "time"
   "golang.org/x/sys/unix"
)
const dLen = 12 * 1024 * 1024
var mode = deliver.PushPull
func modeType(t string) deliver.Mode {
func senderImpl(s deliver.Deliver) {
   var err error
   buf := make([]byte, dLen)
   for {
      if err = s.Send(buf); err != nil {
         fmt.Printf("can't send message on push socket: %s\n", err.Error())
      } else {
         fmt.Printf("send msg length %d\n", len(buf))
      }
      // time.Sleep(10 * time.Millisecond)
   }
}
func sender(url string, args ...interface{}) {
   s := deliver.NewServer(deliver.Mode(mode), url, args...)
   go senderImpl(s)
   c := make(chan os.Signal, 1)
   signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
   <-c
   s.Close()
}
func recvImpl(url string, index int) {
   c := deliver.NewClient(deliver.Mode(mode), url)
   var msg []byte
   var err error
   var t int64
   var elapse int64
   count := 0
   for {
      msg, err = c.Recv()
      if err != nil {
         fmt.Println("recv error : ", err)
      }
      if t == 0 {
         t = time.Now().UnixNano()
      }
      elapse = time.Now().UnixNano() - t
      count++
      if elapse > 1e9 {
         fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
            index, count, len(msg), elapse)
         elapse = 0
         count = 0
         t = 0
      }
      // time.Sleep(10 * time.Millisecond)
   }
}
func reciever(url string, strCount string) {
   count, _ := strconv.Atoi(strCount)
   for i := 0; i < count; i++ {
      go recvImpl(url, i)
   }
   for {
      time.Sleep(2 * time.Second)
   }
}
func modeType(t string) {
   if t == "pushpull" {
      mode = deliver.PushPull
      return deliver.PushPull
   } else if t == "pubsub" {
      mode = deliver.PubSub
      return deliver.PubSub
   } else if t == "pair" {
      mode = deliver.Pair
      return deliver.Pair
   } else if t == "reqrep" {
      return deliver.ReqRep
   }
   return deliver.Mode(-1)
}
func senderMode(ipc string, m deliver.Mode) {
   if m == deliver.ReqRep {
      req(ipc, m)
   }
   sender(ipc, m)
}
func recvMode(ipc string, m deliver.Mode, strCount string) {
   if m == deliver.ReqRep {
      rep(ipc, m)
   }
   reciever(ipc, m, strCount)
}
func main() {
   if len(os.Args) > 3 && os.Args[1] == "producer" {
      modeType(os.Args[2])
      sender(os.Args[3])
      m := modeType(os.Args[2])
      if m > deliver.ModeStart {
         senderMode(os.Args[3], m)
      }
      os.Exit(0)
   }
   if len(os.Args) > 3 && os.Args[1] == "consumer" {
      modeType(os.Args[2])
      reciever(os.Args[3], os.Args[4])
      m := modeType(os.Args[2])
      if m > deliver.ModeStart {
         recvMode(os.Args[3], m, os.Args[4])
      }
      os.Exit(0)
   }
   fmt.Fprintf(os.Stderr,