| | |
| | | package ruleserver |
| | | |
| | | import ( |
| | | bigCache "basic.com/pubsub/cache.git" |
| | | "basic.com/pubsub/cache.git/esutil" |
| | | "basic.com/pubsub/protomsg.git" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "github.com/go-yaml/yaml" |
| | | "github.com/golang/protobuf/proto" |
| | | "io/ioutil" |
| | | "math" |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/req" |
| | | "nanomsg.org/go-mangos/transport/tcp" |
| | | "net" |
| | | "os" |
| | | "ruleprocess/cache" |
| | | "ruleprocess/logger" |
| | | "github.com/golang/protobuf/proto" |
| | | "ruleprocess/structure" |
| | | "strconv" |
| | | "time" |
| | | ) |
| | |
| | | logger.Error("查询本机信息失败!") |
| | | } |
| | | logger.Debug("本机信息和server信息:", localConfig, serverIp, serverPort) |
| | | bigCache.Init(dbTablePersons, serverIp, serverPort, localConfig.ServerId) |
| | | sock, err = req.NewSocket(); |
| | | if err != nil { |
| | | logger.Error("创建请求socket失败: %s", err.Error()) |
| | | } |
| | | go Push1() |
| | | } |
| | | |
| | | var sender chan *protomsg.CompareArgs = make(chan *protomsg.CompareArgs) |
| | | var receiver chan []byte = make(chan []byte) |
| | | type BaseInfo struct { |
| | | TableId string `json:"tableId"` |
| | | TableName string `json:"tableName"` |
| | | BwType string `json:"bwType"` |
| | | CompareScore float64 `json:"compareScore"` |
| | | PersonId string `json:"personId"` |
| | | PersonName string `json:"personName"` |
| | | PersonPicUrl string `json:"personPicUrl"` |
| | | PhoneNum string `json:"phoneNum"` |
| | | Sex string `json:"sex"` |
| | | IdCard string `json:"idCard"` |
| | | MonitorLevel string `json:"monitorLevel"` |
| | | Content string `json:"content"` |
| | | } |
| | | |
| | | // 以摄像机id查出跟其相关的所有任务下的所有规则组 |
| | | func GetRuleGroup(cameraId string, taskId string) *protomsg.TaskGroupArgs { |
| | |
| | | CompareThreshold:compareThreshold, |
| | | Source:false, |
| | | } |
| | | serverIp, _ := GetLocalIP() |
| | | bytes := Push("tcp://"+serverIp+":40010",comArg,sock) |
| | | |
| | | bytes := getCompareMsg(comArg) |
| | | var scResult protomsg.SdkCompareResult |
| | | err1 := proto.Unmarshal(bytes, &scResult) |
| | | if err1 != nil { |
| | | logger.Error("getBaseInfo解压错误", err1) |
| | | return |
| | | } |
| | | //logger.Info("----------------------------------------map是", m) |
| | | ids := []string{} |
| | |
| | | logger.Info("------------------------------------------------------------------------------------------------------------------------") |
| | | //logger.Info("=====================人员id的集合为:",ids) |
| | | if len(ids) > 0 { |
| | | baseinfos, err1 := esutil.Dbpersoninfosbyid(ids, dbTablePersons, serverIp, serverPort) |
| | | var dbApi dbapi.DbPersonApi |
| | | baseinfos, err1 := dbApi.Dbpersoninfosbyid(ids) |
| | | //baseinfos, err1 := esutil.Dbpersoninfosbyid(ids, dbTablePersons, serverIp, serverPort) |
| | | if err1 != nil { |
| | | logger.Error("查询底库人员信息出错", err1) |
| | | } |
| | | var dtapi dbapi.DbTableApi |
| | | for _, baseinfo := range baseinfos { |
| | | // 根据tableId查询底库信息给liker赋值 |
| | | //logger.Info("---------看看每个底库人员的信息:",baseinfo.Id,baseinfo.PersonName) |
| | | tableIds := []string{} |
| | | tableIds = append(tableIds, baseinfo.TableId) // 虽然是传入数组返回数组的接口,但我按单个的使用了 |
| | | table, err := esutil.Dbtablefosbyid(tableIds, "dbtables", serverIp, serverPort) |
| | | table, err := dtapi.DbtablesById(tableIds) |
| | | //table, err := esutil.Dbtablefosbyid(tableIds, "dbtables", serverIp, serverPort) |
| | | if err != nil || len(table) == 0 { |
| | | logger.Error("根据id查询底库信息出错!", err, "--返回值长度为:", len(table)) |
| | | } |
| | |
| | | a.FilterData = append(a.FilterData, &arg1) |
| | | } |
| | | } |
| | | logger.Info("区域是:",areaPoints,"装配完数据区域内目标数量为:",a.TargetNum) |
| | | a.Time = time.Unix(time.Now().Unix(), 0).String()[11:16] |
| | | a.KeepRight = arg.KeepRight |
| | | a.IsStatic = arg.IsStatic |
| | | logger.Info("区域是:",areaPoints,"区域内目标数量为:",a.targetNum,"---",len(a.filterData)) |
| | | a.time = time.Unix(time.Now().Unix(), 0).String()[11:16] |
| | | a.keepRight = arg.KeepRight |
| | | a.isStatic = arg.IsStatic |
| | | //logger.Println("--------------------看看区域数据:",*a) |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | func Push(url string,data *protomsg.CompareArgs,sock mangos.Socket) []byte{ |
| | | func Push(data *protomsg.CompareArgs,sock mangos.Socket) []byte{ |
| | | //var sock mangos.Socket |
| | | var err error |
| | | var msg []byte |
| | |
| | | logger.Error("Failed set MaxRecvSize: %v", err) |
| | | return nil |
| | | } |
| | | //sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | if err = sock.Dial(url); err != nil { |
| | | serverIP, _ := GetLocalIP() |
| | | if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil { |
| | | logger.Error("请求socket拨号失败: %s", err.Error()) |
| | | } |
| | | //sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | |
| | | logger.Info("序列化数据") |
| | | bytes,err1 := proto.Marshal(data) |
| | | logger.Info("数据长度为:",len(bytes)) |
| | |
| | | //bytes := []byte("ndfasojdfaidsos") |
| | | if err = sock.Send(bytes); err != nil { |
| | | logger.Error("推送socket发送数据失败: %s", err.Error()) |
| | | os.Exit(1) |
| | | //os.Exit(1) |
| | | } |
| | | if msg, err = sock.Recv(); err != nil { |
| | | logger.Error("接收响应失败: %s", err.Error()) |
| | | os.Exit(1) |
| | | //os.Exit(1) |
| | | } |
| | | logger.Debug("数据推送成功!收到响应,数据长度为:",len(msg)) |
| | | return msg |
| | | } |
| | | |
| | | func Push1(){ |
| | | //var sock mangos.Socket |
| | | var err error |
| | | var msg []byte |
| | | |
| | | if sock, err = req.NewSocket(); err != nil { |
| | | logger.Error("创建请求socket失败: %s", err.Error()) |
| | | } |
| | | errSize := sock.SetOption(mangos.OptionMaxRecvSize,5*1024*1024) |
| | | if errSize != nil { |
| | | logger.Error("Failed set MaxRecvSize: %v", err) |
| | | } |
| | | //sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | serverIP, _ := GetLocalIP() |
| | | if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil { |
| | | logger.Error("请求socket拨号失败: %s", err.Error()) |
| | | } |
| | | logger.Info("序列化数据") |
| | | |
| | | for { |
| | | select { |
| | | // case <-ctx.Done(): |
| | | // return |
| | | case data := <- sender: |
| | | bytes,err1 := proto.Marshal(data) |
| | | logger.Info("数据长度为:",len(bytes)) |
| | | if err1 != nil { |
| | | logger.Info("序列化失败:",err1) |
| | | } |
| | | logger.Debug("推送数据") |
| | | //bytes := []byte("ndfasojdfaidsos") |
| | | if err = sock.Send(bytes); err != nil { |
| | | logger.Error("推送socket发送数据失败: %s", err.Error()) |
| | | //os.Exit(1) |
| | | } |
| | | if msg, err = sock.Recv(); err != nil { |
| | | logger.Error("接收响应失败: %s", err.Error()) |
| | | //os.Exit(1) |
| | | } |
| | | logger.Debug("数据推送成功!收到响应,数据长度为:",len(msg)) |
| | | receiver <- msg |
| | | default: |
| | | |
| | | } |
| | | } |
| | | } |
| | | func getCompareMsg(data *protomsg.CompareArgs) []byte{ |
| | | sender <- data |
| | | return <-receiver |
| | | } |
| | | // 获取本机ip |
| | | func GetLocalIP() (ipv4 string, err error) { |
| | | var ( |
| | |
| | | |
| | | err = errors.New("ipv4 not found") |
| | | return |
| | | } |
| | | } |