| | |
| | | |
| | | import ( |
| | | "fmt" |
| | | "github.com/gogo/protobuf/proto" |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/rep" |
| | | "nanomsg.org/go-mangos/protocol/req" |
| | | "nanomsg.org/go-mangos/transport/ipc" |
| | | "nanomsg.org/go-mangos/transport/tcp" |
| | | "os" |
| | | "github.com/golang/protobuf/proto" |
| | | "ruleprocess/logger" |
| | | "ruleprocess/ruleserver" |
| | | "time" |
| | | ) |
| | | |
| | | func Die(format string, v ...interface{}) { |
| | | fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) |
| | | logger.Info("+++++++",format) |
| | | os.Exit(1) |
| | | } |
| | |
| | | func date() string { |
| | | return time.Now().Format(time.ANSIC) |
| | | } |
| | | |
| | | func Node0(url string) { |
| | | var msgChan chan []byte |
| | | func Receive(url string) { |
| | | var sock mangos.Socket |
| | | var err error |
| | | var msg []byte |
| | | msgChan = make(chan []byte,200) |
| | | if sock, err = rep.NewSocket(); err != nil { |
| | | Die("can't get new rep socket: %s", err) |
| | | } |
| | |
| | | for { |
| | | // Could also use sock.RecvMsg to get header |
| | | msg, err = sock.Recv() |
| | | if string(msg) == "DATE" { // no need to terminate |
| | | fmt.Println("NODE0: RECEIVED DATE REQUEST") |
| | | if msg != nil { // no need to terminate |
| | | fmt.Println("Received Data request") |
| | | // 把收到的msg塞进通道 |
| | | msgChan <- msg |
| | | // 给发送程序反馈信息 |
| | | d := date() |
| | | fmt.Printf("NODE0: SENDING DATE %s\n", d) |
| | | err = sock.Send([]byte(d)) |
| | | err = sock.Send([]byte("Received Data, --"+d)) |
| | | if err != nil { |
| | | Die("can't send reply: %s", err.Error()) |
| | | } |
| | |
| | | if sock, err = req.NewSocket(); err != nil { |
| | | Die("创建请求socket失败: %s", err.Error()) |
| | | } |
| | | errSize := sock.SetOption(mangos.OptionMaxRecvSize,5*1024*1024) |
| | | if errSize != nil { |
| | | fmt.Errorf("Failed set MaxRecvSize: %v", err) |
| | | return |
| | | } |
| | | //sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | if err = sock.Dial(url); err != nil { |
| | | Die("请求socket拨号失败: %s", err.Error()) |
| | | } |
| | | logger.Info("序列化数据") |
| | | bytes,err1 := proto.Marshal(data) |
| | | logger.Info("数据长度为:",len(bytes)) |
| | | if err1 != nil { |
| | | logger.Info("序列化失败:",err1) |
| | | } |
| | | logger.Debug("推送数据") |
| | | bytes,err := proto.Marshal(data) |
| | | //bytes := []byte("ndfasojdfaidsos") |
| | | if err = sock.Send(bytes); err != nil { |
| | | Die("推送socket发送数据失败: %s", err.Error()) |
| | | } |