From fcdc5890e9a8204805f6347364c3eaa94dff948d Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 16 六月 2020 18:50:40 +0800 Subject: [PATCH] done yuying --- server.go | 86 --------- esutil/EsClient.go | 418 ++++++++++++++------------------------------- 2 files changed, 137 insertions(+), 367 deletions(-) diff --git a/esutil/EsClient.go b/esutil/EsClient.go index bc1e81f..eb11b70 100644 --- a/esutil/EsClient.go +++ b/esutil/EsClient.go @@ -1,7 +1,6 @@ package esutil import ( - "bytes" "encoding/json" "errors" "fmt" @@ -10,16 +9,13 @@ "net/http" "strconv" "strings" - "sync" "time" - "andriodServer/extend/config" log "andriodServer/log" ) -func GetEsDataReq(url string, parama string, picurl string, isSource bool) (error, map[string]interface{}) { - //log.Log.Infoln("es 鏌ヨ璇锋眰璺緞" + url) // 閰嶇疆淇℃伅 鑾峰彇 - req, err := http.NewRequest("POST", url, strings.NewReader(parama)) +func GetEsDataReq(url string, param string, isSource bool) (error, map[string]interface{}) { + req, err := http.NewRequest("POST", url, strings.NewReader(param)) if err != nil { return err, nil @@ -60,72 +56,56 @@ data["total"] = dat["total"] sources := []interface{}{} for _, value := range dat["hits"].([]interface{}) { + d := make(map[string]interface{}) source, ok := value.(map[string]interface{})["_source"].(map[string]interface{}) if !ok { return errors.New("value is not type of map[string]interface{}"), nil } - source["id"] = value.(map[string]interface{})["_id"] + d["id"] = value.(map[string]interface{})["_id"] - sdkType := source["sdkType"] - if sdkType != nil { - sdk, err := strconv.Atoi(sdkType.(string)) - if err != nil { - return err, nil + pmax, exist := source["picMaxUrl"] + if !exist { + continue + } + pmArr := pmax.([]string) + if len(pmArr) > 0 { + d["picMaxUrl"] = pmArr[0] + } + + tB, err := json.Marshal(source["targetInfo"]) + if err != nil { + continue + } + + ti := TargetInfo{} + err = json.Unmarshal(tB, &ti) + if err !=nil { + continue + } + + d["picSmUrl"] = ti.PicSmUrl + if ti.TargetType == "face" { + sLabelStr, ok := source["showLabels"] + if ok { + labelArr := strings.Split(sLabelStr.(string), "/") + if len(labelArr) == 3 { + d["gender"] = labelArr[0] + d["ageDescription"] = labelArr[1] + d["race"] = labelArr[2] + } } - source["sdkType"] = sdkTypeToValue(sdk) + if bInfos,ok := source["baseInfo"]; ok && bInfos != nil { + d["baseInfo"] = getSourceBaseInfo(bInfos) + } else { + d["baseInfo"] = []interface{}{} + } } - pmax, exist := source["picMaxUrl"].(string) - //fmt.Println("picMaxUrl: ",pmax) - if !exist { - return errors.New("picMaxurl is not string"), nil - } + d["videoNum"] = source["videoUrl"] - - if !strings.HasPrefix(pmax, "http") { - source["picMaxUrl"] = picurl + pmax - - } - - psm, exist := source["picSmUrl"].(string) - if !exist { - return errors.New("picSmUrl is not string"), nil - } - - if !strings.HasPrefix(psm, "http") { - source["picSmUrl"] = picurl + psm - } - - if source["sdkType"] != "浜鸿劯" {//琛屼负娌℃湁瀛樺偍澶у浘 - source["picMaxUrl"] = source["picSmUrl"] - } - - prace, exist := source["Race"] - if exist { - source["race"] = prace - } - - pGender, exist := source["Gender"] - if exist { - source["gender"] = pGender - } - - source["ageDescription"] = getAgeDesc(source["Age"]) - - source["videoNum"] = getVideoUrl(source) - //picDate := source["picDate"].(string) - //lastIdx := strings.LastIndex(picDate,":") - //picDateStr := picDate[:lastIdx] - //if err == nil { - // source["picDate"] = picDateStr - //} - - baseInfo := getSourceBaseInfo(source) - source["baseInfo"] = baseInfo - - sources = append(sources, source) + sources = append(sources, d) } data["datalist"] = sources return nil, data @@ -134,236 +114,92 @@ } } -func getAgeDesc(age interface{})(ageDesc string) { - if age !=nil { - ageInt := age.(float64) - if ageInt >0 && ageInt<7 { - ageDesc = "绔ュ勾" - } else if ageInt >=7 && ageInt<18 { - ageDesc = "灏戝勾" - } else if ageInt >=18 && ageInt<40 { - ageDesc = "闈掑勾" - } else if ageInt >=40 && ageInt<65 { - ageDesc = "涓勾" - } else if ageInt >=65 { - ageDesc = "鑰佸勾" - } else { - ageDesc = "" - } - } - return ageDesc +type TargetInfo struct { + TargetId string `json:"targetId"` + TargetType string `json:"targetType"` + PicSmUrl string `json:"picSmUrl"` + TargetScore float32 `json:"targetScore"` } type BaseInfo struct { - TaskId string `json:"taskId"` - TaskName string `json:"taskName"` - LikePer string `json:"likePer"` - TableId string `json:"tableId"` - TableName string `json:"tableName"` - PersonId string `json:"personId"` - PersonPicUrl string `json:"personPicUrl"` - PersonName string `json:"personName"` - Gender string `json:"gender"` - PhoneNum string `json:"phoneNum"` - IDCard string `json:"IDCard"` - MonitorLevel string `json:"monitorLevel"` - Content string `json:"content"` + TaskId string `json:"taskId"` + TaskName string `json:"taskName"` + LikePer string `json:"likePer"` + TableId string `json:"tableId"` + TableName string `json:"tableName"` + PersonId string `json:"personId"` + PersonPicUrl string `json:"personPicUrl"` + PersonName string `json:"personName"` + Gender string `json:"gender"` + PhoneNum string `json:"phoneNum"` + IDCard string `json:"IDCard"` + MonitorLevel string `json:"monitorLevel"` + Content string `json:"content"` } -func getSourceBaseInfo(source map[string]interface{}) []BaseInfo { - sdkType := source["sdkType"].(string) - baseInfoArr := make([]BaseInfo,0) - if sdkType == "浜鸿劯" { - likePer,baseName,personId,idCard,personPicUrl,gender,content :="","","","","","","" - if source["likePer"] !=nil { - likePer = source["likePer"].(string) - } - if source["BaseName"] !=nil { - baseName = source["BaseName"].(string) - } - if source["personId"] !=nil { - personId = source["personId"].(string) - } - if source["idcard"] !=nil { - idCard = source["idcard"].(string) - } - if source["personPicUrl"] !=nil { - personPicUrl = source["personPicUrl"].(string) - } - if source["Gender"] !=nil { - gender = source["Gender"].(string) - } - if source["content"] !=nil { - content = source["content"].(string) - } - var baseInfo = BaseInfo{ - TaskId:"",//2.0鏂板瓧娈� - TaskName:"",//2.0鏂板瓧娈� - LikePer:likePer, - TableId:"",//2.0鏂板瓧娈� - TableName:baseName, - PersonId:personId, - PersonName:idCard,//浜哄憳濮撳悕锛屼粠绠$悊骞冲彴鑾峰彇 - PersonPicUrl:personPicUrl, - Gender:gender, - PhoneNum:"",//鎵嬫満鍙凤紝浠庣鐞嗗钩鍙拌幏鍙� - IDCard:idCard, - MonitorLevel:"",//2.0鏂板瓧娈� - Content:content, - } +type TI struct { + BwType string `json:"bwType"` + TargetPicUrl string `json:"targetPicUrl"` + TargetName string `json:"targetName"` + TargetId string `json:"targetId"` + TableId string `json:"tableId"` + CompareScore string `json:"compareScore"` + MonitorLevel string `json:"monitorLevel"` + Content string `json:"content"` + TableName string `json:"tableName"` + Labels string `json:"labels"` +} - baseInfoArr = append(baseInfoArr, baseInfo) - //bytes, err := json.Marshal(baseInfoArr) - //if err !=nil { - // return "" - //} +func getSourceBaseInfo(bInfos interface{}) []BaseInfo { + baseInfoArr := make([]BaseInfo,0) + + b, err := json.Marshal(bInfos) + if err == nil { + var targetArr []TI + if err = json.Unmarshal(b, &targetArr); err == nil && len(targetArr) >0 { + for _,t := range targetArr { + idCard,sex := "","" + if t.Labels != "" { + arr := strings.Split(t.Labels, "/") + if len(arr) > 0 { + for _,str := range arr { + if str == "鐢�" || str == "濂�" { + sex = str + break + } + } + for _,str := range arr { + if len(str) == 18 { + idCard = str + break + } + } + } + } + baseInfoArr = append(baseInfoArr, BaseInfo{ + TaskId: "",//2.0鏂板瓧娈� + TaskName: "",//2.0鏂板瓧娈� + LikePer: t.CompareScore, + TableId: "",//2.0鏂板瓧娈� + TableName: t.TableName, + PersonId: t.TargetId, + PersonName: t.TargetName,//浜哄憳濮撳悕锛屼粠绠$悊骞冲彴鑾峰彇 + PersonPicUrl: t.TargetPicUrl, + Gender: sex, + PhoneNum: "",//鎵嬫満鍙凤紝浠庣鐞嗗钩鍙拌幏鍙� + IDCard: idCard, + MonitorLevel: t.MonitorLevel,//2.0鏂板瓧娈� + Content: t.Content, + }) + } + } } + return baseInfoArr } -var videoCacheMap = make(map[string]string,0) -var lock sync.RWMutex - -func setVideoCache(imgKey string, url string) { - lock.Lock() - defer lock.Unlock() - videoCacheMap[imgKey] = url -} - -func getVideoFromCache(imgKey string) string { - lock.Lock() - defer lock.Unlock() - if v,ok := videoCacheMap[imgKey];ok { - return v - } - return "" -} - -func getVideoUrl(source map[string]interface{}) (videoUrl string){ - imgKey := source["imgKey"].(string) - - //鍏堜粠缂撳瓨閲岄潰鍙� - cacheUrl := getVideoFromCache(imgKey) - if cacheUrl !="" { - return cacheUrl - } - - picDate := source["picDate"].(string)//鎶撴媿鏃ユ湡 - cameraId := source["videoReqNum"].(string)//鎽勫儚鏈篿d - indeviceId := source["indeviceid"].(string)//鍒嗘瀽璁惧id - - reqUrl := "" - if url,ok := config.ServerMap[indeviceId];!ok { - return "" - } else { - reqUrl = url - } - log.Log.Infoln("reqUrl:",reqUrl) - - paramMap := make(map[string]interface{},0) - paramMap["imgKey"] = imgKey - paramMap["picDate"] = picDate - paramMap["videoNum"] = cameraId - - respBytes, err := doPostRequest(reqUrl, "application/json", paramMap, nil, nil) - if err !=nil{ - return "" - } - - var resp RespVideo - err = json.Unmarshal(respBytes, &resp) - if err !=nil { - return "" - } - - filePath := resp.FilePath - videoUrl = "" - if !strings.Contains(filePath, "/cut"){ - videoUrl = "" - fmt.Printf("videoReqUrl:%s ,imgKey:%s ,picDate:%s ,cameraId:%s ,filePath:%s \n ",reqUrl,imgKey,picDate,cameraId,filePath) - } else { - strArr := strings.Split(filePath, "/cut") - ngxUrl := config.NgxMap[indeviceId] - log.Log.Infoln("ngxUrl:",ngxUrl) - if ngxUrl !="" && len(strArr) >0 { - videoUrl = ngxUrl + strArr[1] - } - } - if videoUrl == ""{ - respMap := make(map[string]interface{},0) - err := json.Unmarshal(respBytes, &respMap) - if err !=nil { - fmt.Println("resp UnmarshalToMap err: ",err) - } - fmt.Println("resp: ",respMap) - } - setVideoCache(imgKey, videoUrl) - return videoUrl -} - -type RespVideo struct{ - FilePath string `json:"file_path"` -} -func doPostRequest(url string, contentType string, body map[string]interface{}, params map[string]string, headers map[string]string) ([]byte, error) { - var resultBytes []byte - var bodyJson []byte - if body != nil { - var err error - bodyJson, err = json.Marshal(body) - if err != nil { - log.Log.Errorln("doPostRequestMarshal err:",err) - return resultBytes, err - } - } - request, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyJson)) - if err != nil { - log.Log.Errorln("NewRequest ERR:",err) - return resultBytes, err - } - request.Header.Set("Content-type", contentType) - //add params - q := request.URL.Query() - if params != nil { - for key, val := range params { - q.Add(key, val) - } - request.URL.RawQuery = q.Encode() - } - // add headers - if headers != nil { - for key, val := range headers { - request.Header.Add(key, val) - } - } - timeOut:= time.Duration(8*time.Second)//set request timeout - client := &http.Client{ - Timeout:timeOut, - } - resp, err := client.Do(request) - if err != nil { - log.Log.Errorln("DoRequest ERR:",err) - return resultBytes, err - } - defer resp.Body.Close() - resultBytes, err = ioutil.ReadAll(resp.Body) - if err != nil { - log.Log.Errorln("ReadAll ERR:",err) - return resultBytes, err - } - return resultBytes, nil -} - -//sdk绫诲瀷 -func sdkTypeToValue(i int) string { - value := []string{"浜鸿劯", "杞﹁締", "浜轰綋", "鍏ヤ镜", "鎷ユ尋", "闈犲彸琛�", "浜哄憳寮傚父", "涓綋闈欐"} - - return value[i-1] -} - -func PostAction(sec int, Eurl string, picurl string, ishub string, size int, lastT time.Time, curTime time.Time) []byte { - //lastTimeStr := lastT.Format("2006-01-02 15:04:05") - //curTimeStr := curTime.Format("2006-01-02 15:04:05") - index := "videopersons,personaction" +func PostAction(sec int, Eurl string, ishub string, size int, lastT time.Time, curTime time.Time) []byte { + index := "ai_ocean" url := fmt.Sprintf("%s%s%s", Eurl, index, "/_search") sizeStr :="" @@ -379,8 +215,9 @@ preSec = strconv.Itoa(sec) } var filterArr []string + //鏄惁鏌ユ姤璀︽暟鎹� if ishub == "hub" { - filterArr = append(filterArr,"{\"term\":{\"personIsHub\":\"1\"}}") + filterArr = append(filterArr,"{\"term\":{\"alarmRules.alarmLevel.raw\":\"涓�绾"}}") } filterArr = append(filterArr, "{\"range\":{\"picDate\":{\"gte\":\"now+8h-"+preSec+"s\",\"lt\":\"now+8h\"}}}") @@ -389,11 +226,22 @@ if len(filterArr) >0 { filterStr = strings.Join(filterArr, ",") } + sourceArr := []string{ + "baseInfo", + "targetInfo", + "content", + "id", + "picMaxUrl", + "picDate", + "showLabels", + "taskName", + } + sourceStr := strings.Join(sourceArr, ",") log.Log.Infoln("filterArr:", filterStr) - prama := "{\"query\":{\"bool\":{\"filter\":["+filterStr+"]}},\"size\":\""+sizeStr+"\",\"sort\":[{\"picDate\":{\"order\":\"desc\"}}]," + - "\"_source\":[\"baseInfo\",\"Gender\",\"BaseName\",\"Age\",\"personId\",\"personPicUrl\",\"indeviceName\",\"imgKey\",\"sdkType\",\"ageDescription\",\"indeviceid\",\"content\",\"Id\",\"picAddress\",\"picMaxUrl\",\"picDate\",\"Race\",\"videoNum\",\"picSmUrl\",\"taskName\",\"personIsHub\",\"idcard\",\"videoIp\",\"videoReqNum\"]" + + param := "{\"query\":{\"bool\":{\"filter\":["+filterStr+"]}},\"size\":\""+sizeStr+"\",\"sort\":[{\"picDate\":{\"order\":\"desc\"}}]," + + "\"_source\":{\"includes\":["+sourceStr+"],\"excludes\":[\"*.feature\",\"*.attachTarget\",\"*.targetLocation\",\"alarmRules\"]}" + "}" - err, tokenRes := GetEsDataReq(url, prama, picurl, true) + err, tokenRes := GetEsDataReq(url, param, true) if err != nil { log.Log.Errorln(err) diff --git a/server.go b/server.go index 0a1a4c8..e4ec3ee 100644 --- a/server.go +++ b/server.go @@ -3,7 +3,6 @@ import ( "flag" "fmt" - "net" "strconv" "time" @@ -14,8 +13,8 @@ ) var addr = flag.String("addr", "0.0.0.0", "The address to listen to") -var Eurl = flag.String("eurl", "http://192.168.1.182:9200/", "The port to listen on") -var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url") +var Eurl = flag.String("eurl", "http://172.16.50.241:9200/", "The port to listen on") +//var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url") var port = flag.Int("port", 6000, "The port to listen on") var sec = flag.Int("sec", 10, "the second for query data") @@ -36,28 +35,6 @@ log.SetLogLevel(*Level) config.Init(*env, *confPath) fmt.Println(*port) - //src := *addr + ":" + strconv.Itoa(*port) - //listener, err := net.Listen("tcp", src) - //if err != nil { - // log.Log.Errorln(err) - // return - //} - //log.Log.Infof("Listening on %s.\n", src) - - //fmt.Println("starting server success.") - //defer listener.Close() - - //connArr:=make([]net.Conn,0) - - //for { - // conn, err := listener.Accept()// - // - // connArr = append(connArr,conn) - // if err != nil { - // log.Log.Infoln("some connecion error: ", err) - // } - // go handleConnection(conn,connArr) - //} mqAddr := "amqp://" + *mqUser + ":" + *mqPass + "@" + *mqIp + ":" + strconv.Itoa(*mqPort)+"/" @@ -97,7 +74,7 @@ select { case <-tick.C: curTime := time.Now() - alarmData := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size, lastTime, curTime) + alarmData := esutil.PostAction(*sec, *Eurl, *IsHub, *Size, lastTime, curTime) if alarmData != nil { err := ch.Publish( "", @@ -117,59 +94,4 @@ lastTime = curTime } } -} - -func handleConnection(conn net.Conn, connArr []net.Conn) { - remoteAddr := conn.RemoteAddr().String() - log.Log.Infoln("Client connected from ", remoteAddr) - - ech := make(chan error) - go func(conn net.Conn, ech chan error) { - buf := make([]byte, 10) - readMsg, err := conn.Read(buf) - log.Log.Infoln("Read completed,readMsg:",readMsg,",err:",err) - ech <- err - - }(conn, ech) - - tick := time.NewTicker(3 * time.Second) - lastTime := time.Now() - for { - select { - case <-tick.C: - curTime := time.Now() - if !handleMessage(conn, connArr, lastTime, curTime){ - conn.Close() - return - } - lastTime = curTime - case err := <-ech: - log.Log.Infoln(err, "remoteAddr ", remoteAddr, " close") - conn.Close() - return - } - } - - log.Log.Infoln("Client at " + remoteAddr + " disconnected.") -} - -func handleMessage(conn net.Conn, connArr []net.Conn,lastT time.Time, curTime time.Time) bool { - jsonstring := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size, lastT, curTime) - if jsonstring == nil { - log.Log.Infoln("the data is nil,remoteArr:",conn.RemoteAddr()) - if _, err := conn.Write([]byte("\000"));err !=nil { - log.Log.Infoln("conn.WriteErr:",err) - return false - } else { - return true - } - } - jsonstring = append(jsonstring, []byte("\000")...) - log.Log.Infoln("jsonstring len: ", len(jsonstring), "\000 data: ", len("\000")) - _, err := conn.Write(jsonstring) - if err !=nil{ - log.Log.Infoln("conn.WriteErr:",err) - return false - } - return true -} +} \ No newline at end of file -- Gitblit v1.8.0