package labelFilter import ( "basic.com/dbapi.git" "basic.com/valib/logger.git" "fmt" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "time" ) var urlPool = make(map[string]chan []byte,100) //var urlChans = make([]urlChan,100) //type urlChan struct { // url string // ch chan *structure.ResultMsg //} //var pool chan *structure.ResultMsg = make(chan *structure.ResultMsg) func Die(format string, v ...interface{}) { logger.Info("+++++++",format,v) //os.Exit(1) } func date() string { return time.Now().Format(time.ANSIC) } var msgChan chan []byte func Receive(url string) { var sock mangos.Socket var err error var msg []byte msgChan = make(chan []byte,200) if sock, err = rep.NewSocket(); err != nil { Die("can't get new rep socket: %s", err) } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { Die("can't listen on rep socket: %s", err.Error()) } for { // Could also use sock.RecvMsg to get header msg, err = sock.Recv() if msg != nil { // no need to terminate fmt.Println("Received Data request") // 把收到的msg塞进通道 msgChan <- msg // 给发送程序反馈信息 d := date() err = sock.Send([]byte("Received Data, --"+d)) if err != nil { Die("can't send reply: %s", err.Error()) } } } } func Init(){ var api dbapi.EventPushApi b, allRules := api.FindAllDetails() logger.Info("初始化事件推送,查看所有规则组:", allRules) if !b { logger.Error("查询时间推送规则失败!") } for _, ruleGroup := range allRules { if ruleGroup.Enable { // 大规则开关开启状态 for _, url := range ruleGroup.Urls { // 为每个url建立一个chan if urlPool[url.Url] == nil { urlPool[url.Url] = make(chan []byte,100) logger.Info("初始化信息:",urlPool) go GoPush(url.Url) } } } } } func GoPush(url string) { var err error var msg []byte var sock mangos.Socket if sock, err = req.NewSocket(); err != nil { Die("创建请求socket失败: %s", err.Error()) } errSize := sock.SetOption(mangos.OptionMaxRecvSize,30*1024*1024) if errSize != nil { logger.Error("传输的数据超过大小限制") return } errTimeOut := sock.SetOption(mangos.OptionRecvDeadline,time.Millisecond * 1500) if errTimeOut != nil { logger.Error("接收响应超时") return } errTimeOut1 := sock.SetOption(mangos.OptionSendDeadline,time.Millisecond * 1500) if errTimeOut1 != nil { logger.Error("发送超时") return } errWrite := sock.SetOption(mangos.OptionWriteQLen,5) if errWrite != nil { logger.Error("设置传输缓存大小失败") return } errRead := sock.SetOption(mangos.OptionReadQLen,5) if errRead != nil { logger.Error("设置传输缓存大小失败") return } sock.AddTransport(tcp.NewTransport()) if err = sock.Dial("tcp://"+url); err != nil { logger.Error("请求socket拨号失败: ", err.Error()) } logger.Info("序列化数据") //for v := range pool{ // logger.Info("无限循环",v.Cid) //} //var ch chan *structure.ResultMsg //for _, v := range urlChans { // if v.url == url{ // ch = v.ch // } //} logger.Info("chan信息:",urlPool[url]) for { select { // case <-ctx.Done(): // return case data := <- urlPool[url]: //logger.Info("接收到数据",data.Cid) //bytes,err1 := proto.Marshal(data) go func(data []byte) { logger.Info("数据长度为:",len(data)) //if err1 != nil { // logger.Info("序列化失败:",err1) //} start := time.Now() logger.Debug("groutine"+url+"推送数据") //bytes := []byte("ndfasojdfaidsos") if err = sock.Send(data); err != nil { Die("groutine"+url+"推送socket发送数据失败: ", err.Error()) } pushTime := time.Since(start) msg, err = sock.Recv(); if err != nil { Die("groutine"+url+"接收响应失败: ", err.Error(),pushTime,time.Since(start)) } else { logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg),pushTime,time.Since(start)) } }(data) default: } } sock.Close() } //func main() { // url := "tcp://192.168.1.123:40011" // Push(url,"hahahaha") //}