New file |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "demo/deliver" |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "time" |
| | | |
| | | "golang.org/x/sys/unix" |
| | | ) |
| | | |
| | | func oneSenderImpl(s deliver.Deliver) { |
| | | var err error |
| | | |
| | | buf := make([]byte, dLen) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | |
| | | if err = s.Send(buf); err != nil { |
| | | |
| | | fmt.Printf("can't send message on push socket: %s\n", err.Error()) |
| | | } else { |
| | | |
| | | fmt.Printf("send msg length %d\n", len(buf)) |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | func oneSender(url string, m deliver.Mode, args ...interface{}) { |
| | | s := deliver.NewServer(m, url, args...) |
| | | |
| | | go oneSenderImpl(s) |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | |
| | | cancel() |
| | | s.Close() |
| | | } |
| | | |
| | | func nRecvImpl(c deliver.Deliver, index int) { |
| | | |
| | | var msg []byte |
| | | var err error |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | msg, err = c.Recv() |
| | | if err != nil { |
| | | fmt.Println("recv error : ", err) |
| | | } |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", |
| | | index, count, len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | func nReciever(url string, m deliver.Mode, count int) { |
| | | |
| | | var cs []deliver.Deliver |
| | | |
| | | for i := 0; i < count; i++ { |
| | | c := deliver.NewClient(m, url) |
| | | cs = append(cs, c) |
| | | |
| | | go nRecvImpl(c, i) |
| | | } |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | |
| | | cancel() |
| | | for _, v := range cs { |
| | | v.Close() |
| | | } |
| | | |
| | | } |
| | |
| | | import ( |
| | | "context" |
| | | "demo/deliver" |
| | | "flag" |
| | | "fmt" |
| | | "os" |
| | | ) |
| | |
| | | const dLen = 12 * 1024 * 1024 |
| | | |
| | | var ctx, cancel = context.WithCancel(context.Background()) |
| | | |
| | | func senderMode(ipc string, m deliver.Mode, count int, one bool) { |
| | | if m == deliver.ReqRep { |
| | | req(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmSender(ipc, 2, 32*1024*1024) |
| | | } |
| | | |
| | | if one { |
| | | oneSender(ipc, m) |
| | | } else { |
| | | nSender(ipc, m, count) |
| | | } |
| | | } |
| | | |
| | | func recvMode(ipc string, m deliver.Mode, count int, n bool) { |
| | | if m == deliver.ReqRep { |
| | | rep(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmReciever(ipc, count) |
| | | } |
| | | |
| | | if n { |
| | | nReciever(ipc, m, count) |
| | | } else { |
| | | oneReciever(ipc, m) |
| | | } |
| | | } |
| | | |
| | | var ( |
| | | proc string |
| | | procCount int |
| | | mode string |
| | | ipc string |
| | | oneSendnRecv bool |
| | | ) |
| | | |
| | | const ( |
| | | act = "act" |
| | | pass = "pass" |
| | | ) |
| | | |
| | | func init() { |
| | | flag.StringVar(&proc, "p", "act", "proc as sender") |
| | | flag.IntVar(&procCount, "c", 1, "proc run count") |
| | | |
| | | flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.") |
| | | |
| | | flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label") |
| | | |
| | | flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv") |
| | | } |
| | | |
| | | func modeType(t string) deliver.Mode { |
| | | |
| | |
| | | return deliver.NONE |
| | | } |
| | | |
| | | func senderMode(ipc string, m deliver.Mode) { |
| | | if m == deliver.ReqRep { |
| | | req(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmSender(ipc, 2, 32*1024*1024) |
| | | } |
| | | sender(ipc, m) |
| | | } |
| | | |
| | | func recvMode(ipc string, m deliver.Mode, strCount string) { |
| | | if m == deliver.ReqRep { |
| | | rep(ipc, m) |
| | | } else if m == deliver.Shm { |
| | | shmReciever(ipc, strCount) |
| | | } |
| | | reciever(ipc, m, strCount) |
| | | } |
| | | |
| | | func main() { |
| | | if len(os.Args) > 3 && os.Args[1] == "producer" { |
| | | m := modeType(os.Args[2]) |
| | | if m > deliver.ModeStart { |
| | | senderMode(os.Args[3], m) |
| | | flag.Parse() |
| | | |
| | | m := modeType(mode) |
| | | if m > deliver.ModeStart { |
| | | if proc == act { |
| | | senderMode(ipc, m, procCount, oneSendnRecv) |
| | | } else { |
| | | recvMode(ipc, m, procCount, oneSendnRecv) |
| | | } |
| | | os.Exit(0) |
| | | } |
| | | if len(os.Args) > 3 && os.Args[1] == "consumer" { |
| | | m := modeType(os.Args[2]) |
| | | if m > deliver.ModeStart { |
| | | recvMode(os.Args[3], m, os.Args[4]) |
| | | } |
| | | os.Exit(0) |
| | | } |
| | | |
| | | fmt.Fprintf(os.Stderr, |
| | | "Usage: pushpull push|pull <URL> <ARG> ...\n") |
| | | os.Exit(1) |
New file |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "demo/deliver" |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "time" |
| | | |
| | | "golang.org/x/sys/unix" |
| | | ) |
| | | |
| | | func nSenderImpl(s deliver.Deliver, index int) { |
| | | var err error |
| | | |
| | | buf := make([]byte, dLen) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err = s.Send(buf); err != nil { |
| | | |
| | | fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error()) |
| | | } else { |
| | | |
| | | fmt.Printf("%d send msg length %d\n", index, len(buf)) |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | func nSender(url string, m deliver.Mode, count int, args ...interface{}) { |
| | | |
| | | var cs []deliver.Deliver |
| | | |
| | | for i := 0; i < count; i++ { |
| | | c := deliver.NewClient(m, url, args...) |
| | | cs = append(cs, c) |
| | | |
| | | go nSenderImpl(c, i) |
| | | } |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | cancel() |
| | | for _, v := range cs { |
| | | v.Close() |
| | | } |
| | | |
| | | } |
| | | |
| | | func oneRecvImpl(c deliver.Deliver, index int) { |
| | | |
| | | var msg []byte |
| | | var err error |
| | | |
| | | var t int64 |
| | | var elapse int64 |
| | | count := 0 |
| | | |
| | | for { |
| | | msg, err = c.Recv() |
| | | if err != nil { |
| | | fmt.Println("recv error : ", err) |
| | | } |
| | | if t == 0 { |
| | | t = time.Now().UnixNano() |
| | | } |
| | | elapse = time.Now().UnixNano() - t |
| | | |
| | | count++ |
| | | |
| | | if elapse > 1e9 { |
| | | fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", |
| | | index, count, len(msg), elapse) |
| | | elapse = 0 |
| | | count = 0 |
| | | t = 0 |
| | | } |
| | | |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | func oneReciever(url string, m deliver.Mode) { |
| | | |
| | | s := deliver.NewServer(m, url) |
| | | |
| | | go oneRecvImpl(s, 0) |
| | | |
| | | c := make(chan os.Signal, 1) |
| | | signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) |
| | | <-c |
| | | |
| | | cancel() |
| | | s.Close() |
| | | } |
| | |
| | | "fmt" |
| | | "os" |
| | | "os/signal" |
| | | "strconv" |
| | | "time" |
| | | |
| | | "golang.org/x/sys/unix" |
| | |
| | | } |
| | | } |
| | | |
| | | func shmReciever(url string, strCount string) { |
| | | count, _ := strconv.Atoi(strCount) |
| | | func shmReciever(url string, count int) { |
| | | |
| | | var cs []deliver.Deliver |
| | | for i := 0; i < count; i++ { |