package labelFilter import ( "basic.com/valib/logger.git" "fmt" "github.com/golang/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "ruleprocess/structure" "time" ) func Die(format string, v ...interface{}) { logger.Info("+++++++",format) //os.Exit(1) } func date() string { return time.Now().Format(time.ANSIC) } var msgChan chan []byte func Receive(url string) { var sock mangos.Socket var err error var msg []byte msgChan = make(chan []byte,200) if sock, err = rep.NewSocket(); err != nil { Die("can't get new rep socket: %s", err) } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { Die("can't listen on rep socket: %s", err.Error()) } for { // Could also use sock.RecvMsg to get header msg, err = sock.Recv() if msg != nil { // no need to terminate fmt.Println("Received Data request") // 把收到的msg塞进通道 msgChan <- msg // 给发送程序反馈信息 d := date() err = sock.Send([]byte("Received Data, --"+d)) if err != nil { Die("can't send reply: %s", err.Error()) } } } } func Push(url string,data structure.ResultMsg) { var sock mangos.Socket var err error var msg []byte if sock, err = req.NewSocket(); err != nil { Die("创建请求socket失败: %s", err.Error()) } errSize := sock.SetOption(mangos.OptionMaxRecvSize,30*1024*1024) if errSize != nil { fmt.Errorf("Failed set MaxRecvSize: %v", err) return } errTimeOut := sock.SetOption(mangos.OptionRecvDeadline,time.Millisecond * 2000) if errTimeOut != nil { fmt.Errorf("Failed set MaxRecvDeadline: %v", err) return } //sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Dial(url); err != nil { Die("请求socket拨号失败: %s", err.Error()) } logger.Info("序列化数据") bytes,err1 := proto.Marshal(data) logger.Info("数据长度为:",len(bytes)) if err1 != nil { logger.Info("序列化失败:",err1) } logger.Debug("推送数据") //bytes := []byte("ndfasojdfaidsos") if err = sock.Send(bytes); err != nil { Die("推送socket发送数据失败: %s", err.Error()) } if msg, err = sock.Recv(); err != nil { Die("接收响应失败: %s", err.Error()) } logger.Debug("事件推送成功!收到响应",string(msg)) sock.Close() } //func main() { // url := "tcp://192.168.1.123:40011" // Push(url,"hahahaha") //}