package labelFilter import ( "fmt" "github.com/gogo/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "os" "ruleprocess/logger" "ruleprocess/ruleserver" "time" ) func Die(format string, v ...interface{}) { fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) logger.Info("+++++++",format) os.Exit(1) } func date() string { return time.Now().Format(time.ANSIC) } func Node0(url string) { var sock mangos.Socket var err error var msg []byte if sock, err = rep.NewSocket(); err != nil { Die("can't get new rep socket: %s", err) } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { Die("can't listen on rep socket: %s", err.Error()) } for { // Could also use sock.RecvMsg to get header msg, err = sock.Recv() if string(msg) == "DATE" { // no need to terminate fmt.Println("NODE0: RECEIVED DATE REQUEST") d := date() fmt.Printf("NODE0: SENDING DATE %s\n", d) err = sock.Send([]byte(d)) if err != nil { Die("can't send reply: %s", err.Error()) } } } } func Push(url string,data ruleserver.ResultMsg) { var sock mangos.Socket var err error var msg []byte if sock, err = req.NewSocket(); err != nil { Die("创建请求socket失败: %s", err.Error()) } //sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Dial(url); err != nil { Die("请求socket拨号失败: %s", err.Error()) } logger.Debug("推送数据") bytes,err := proto.Marshal(data) if err = sock.Send(bytes); err != nil { Die("推送socket发送数据失败: %s", err.Error()) } if msg, err = sock.Recv(); err != nil { Die("接收响应失败: %s", err.Error()) } logger.Debug("数据推送成功!收到响应",string(msg)) sock.Close() } //func main() { // url := "tcp://192.168.1.123:40011" // Push(url,"hahahaha") //}