| | |
| | | "basic.com/dbapi.git" |
| | | "basic.com/valib/logger.git" |
| | | "fmt" |
| | | "github.com/golang/protobuf/proto" |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/rep" |
| | | "nanomsg.org/go-mangos/protocol/req" |
| | | "nanomsg.org/go-mangos/transport/ipc" |
| | | "nanomsg.org/go-mangos/transport/tcp" |
| | | "ruleprocess/structure" |
| | | "time" |
| | | ) |
| | | |
| | | |
| | | var urlPool = make(map[string]chan structure.ResultMsg) |
| | | var urlPool = make(map[string]chan []byte,100) |
| | | //var urlChans = make([]urlChan,100) |
| | | //type urlChan struct { |
| | | // url string |
| | | // ch chan *structure.ResultMsg |
| | | //} |
| | | //var pool chan *structure.ResultMsg = make(chan *structure.ResultMsg) |
| | | func Die(format string, v ...interface{}) { |
| | | logger.Info("+++++++",format) |
| | | logger.Info("+++++++",format,v) |
| | | //os.Exit(1) |
| | | } |
| | | |
| | |
| | | if ruleGroup.Enable { // 大规则开关开启状态 |
| | | for _, url := range ruleGroup.Urls { |
| | | // 为每个url建立一个chan |
| | | urlPool[url.Url] = make(chan structure.ResultMsg) |
| | | go GoPush(url.Url) |
| | | if urlPool[url.Url] == nil { |
| | | urlPool[url.Url] = make(chan []byte,100) |
| | | logger.Info("初始化信息:",urlPool) |
| | | go GoPush(url.Url) |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | logger.Error("接收响应超时") |
| | | return |
| | | } |
| | | errTimeOut1 := sock.SetOption(mangos.OptionSendDeadline,time.Millisecond * 2000) |
| | | if errTimeOut1 != nil { |
| | | logger.Error("发送超时") |
| | | return |
| | | } |
| | | errWrite := sock.SetOption(mangos.OptionWriteQLen,5) |
| | | if errWrite != nil { |
| | | logger.Error("设置传输缓存大小失败") |
| | | return |
| | | } |
| | | errRead := sock.SetOption(mangos.OptionReadQLen,5) |
| | | if errRead != nil { |
| | | logger.Error("设置传输缓存大小失败") |
| | | return |
| | | } |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | if err = sock.Dial("tcp://"+url); err != nil { |
| | | logger.Error("请求socket拨号失败: ", err.Error()) |
| | | } |
| | | logger.Info("序列化数据") |
| | | |
| | | //for v := range pool{ |
| | | // logger.Info("无限循环",v.Cid) |
| | | //} |
| | | //var ch chan *structure.ResultMsg |
| | | //for _, v := range urlChans { |
| | | // if v.url == url{ |
| | | // ch = v.ch |
| | | // } |
| | | //} |
| | | logger.Info("chan信息:",urlPool[url]) |
| | | for { |
| | | select { |
| | | // case <-ctx.Done(): |
| | | // return |
| | | case data := <- urlPool[url]: |
| | | |
| | | bytes,err1 := proto.Marshal(data) |
| | | logger.Info("数据长度为:",len(bytes)) |
| | | if err1 != nil { |
| | | logger.Info("序列化失败:",err1) |
| | | } |
| | | //logger.Info("接收到数据",data.Cid) |
| | | //bytes,err1 := proto.Marshal(data) |
| | | logger.Info("数据长度为:",len(data)) |
| | | //if err1 != nil { |
| | | // logger.Info("序列化失败:",err1) |
| | | //} |
| | | start := time.Now() |
| | | logger.Debug("groutine"+url+"推送数据") |
| | | //bytes := []byte("ndfasojdfaidsos") |
| | | if err = sock.Send(bytes); err != nil { |
| | | if err = sock.Send(data); err != nil { |
| | | Die("groutine"+url+"推送socket发送数据失败: ", err.Error()) |
| | | } |
| | | pushTime := time.Since(start) |
| | | msg, err = sock.Recv(); |
| | | if err != nil { |
| | | Die("groutine"+url+"接收响应失败: ", err.Error()) |
| | | Die("groutine"+url+"接收响应失败: ", err.Error(),pushTime,time.Since(start)) |
| | | } else { |
| | | logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg)) |
| | | logger.Debug("事件推送成功!groutine"+url+"收到响应",string(msg),pushTime,time.Since(start)) |
| | | } |
| | | |
| | | default: |