tcp server 用于给andriod 客户端定时发送消息
liuxiaolong
2020-06-16 fcdc5890e9a8204805f6347364c3eaa94dff948d
done yuying
2个文件已修改
504 ■■■■ 已修改文件
esutil/EsClient.go 418 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.go 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
esutil/EsClient.go
@@ -1,7 +1,6 @@
package esutil
import (
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
@@ -10,16 +9,13 @@
    "net/http"
    "strconv"
    "strings"
    "sync"
    "time"
    "andriodServer/extend/config"
    log "andriodServer/log"
)
func GetEsDataReq(url string, parama string, picurl string, isSource bool) (error, map[string]interface{}) {
    //log.Log.Infoln("es 查询请求路径" + url) //  配置信息 获取
    req, err := http.NewRequest("POST", url, strings.NewReader(parama))
func GetEsDataReq(url string, param string, isSource bool) (error, map[string]interface{}) {
    req, err := http.NewRequest("POST", url, strings.NewReader(param))
    if err != nil {
        return err, nil
@@ -60,72 +56,56 @@
        data["total"] = dat["total"]
        sources := []interface{}{}
        for _, value := range dat["hits"].([]interface{}) {
            d := make(map[string]interface{})
            source, ok := value.(map[string]interface{})["_source"].(map[string]interface{})
            if !ok {
                return errors.New("value is not type of map[string]interface{}"), nil
            }
            source["id"] = value.(map[string]interface{})["_id"]
            d["id"] = value.(map[string]interface{})["_id"]
            sdkType := source["sdkType"]
            if sdkType != nil {
                sdk, err := strconv.Atoi(sdkType.(string))
                if err != nil {
                    return err, nil
            pmax, exist := source["picMaxUrl"]
            if !exist {
                continue
            }
            pmArr := pmax.([]string)
            if len(pmArr) > 0 {
                d["picMaxUrl"] = pmArr[0]
            }
            tB, err := json.Marshal(source["targetInfo"])
            if err != nil {
                continue
            }
            ti := TargetInfo{}
            err = json.Unmarshal(tB, &ti)
            if err !=nil {
                continue
            }
            d["picSmUrl"] = ti.PicSmUrl
            if ti.TargetType == "face" {
                sLabelStr, ok := source["showLabels"]
                if ok {
                    labelArr := strings.Split(sLabelStr.(string), "/")
                    if len(labelArr) == 3 {
                        d["gender"] = labelArr[0]
                        d["ageDescription"] = labelArr[1]
                        d["race"] = labelArr[2]
                    }
                }
                source["sdkType"] = sdkTypeToValue(sdk)
                if bInfos,ok := source["baseInfo"]; ok && bInfos != nil {
                    d["baseInfo"] = getSourceBaseInfo(bInfos)
                } else {
                    d["baseInfo"] = []interface{}{}
                }
            }
            pmax, exist := source["picMaxUrl"].(string)
            //fmt.Println("picMaxUrl: ",pmax)
            if !exist {
                return errors.New("picMaxurl is not string"), nil
            }
            d["videoNum"] = source["videoUrl"]
            if !strings.HasPrefix(pmax, "http") {
                source["picMaxUrl"] = picurl + pmax
            }
            psm, exist := source["picSmUrl"].(string)
            if !exist {
                return errors.New("picSmUrl is not string"), nil
            }
            if !strings.HasPrefix(psm, "http") {
                source["picSmUrl"] = picurl + psm
            }
            if source["sdkType"] != "人脸" {//行为没有存储大图
                source["picMaxUrl"] = source["picSmUrl"]
            }
            prace, exist := source["Race"]
            if exist {
                source["race"] = prace
            }
            pGender, exist := source["Gender"]
            if exist {
                source["gender"] = pGender
            }
            source["ageDescription"] = getAgeDesc(source["Age"])
            source["videoNum"] = getVideoUrl(source)
            //picDate := source["picDate"].(string)
            //lastIdx := strings.LastIndex(picDate,":")
            //picDateStr := picDate[:lastIdx]
            //if err == nil {
            //    source["picDate"] = picDateStr
            //}
            baseInfo := getSourceBaseInfo(source)
            source["baseInfo"] = baseInfo
            sources = append(sources, source)
            sources = append(sources, d)
        }
        data["datalist"] = sources
        return nil, data
@@ -134,236 +114,92 @@
    }
}
func getAgeDesc(age interface{})(ageDesc string) {
    if age !=nil {
        ageInt := age.(float64)
        if ageInt >0 && ageInt<7 {
            ageDesc = "童年"
        } else if ageInt >=7 && ageInt<18 {
            ageDesc = "少年"
        } else if ageInt >=18 && ageInt<40 {
            ageDesc = "青年"
        } else if ageInt >=40 && ageInt<65 {
            ageDesc = "中年"
        } else if ageInt >=65 {
            ageDesc = "老年"
        } else {
            ageDesc = ""
        }
    }
    return ageDesc
type TargetInfo struct {
    TargetId         string          `json:"targetId"`
    TargetType         string             `json:"targetType"`
    PicSmUrl         string             `json:"picSmUrl"`
    TargetScore     float32         `json:"targetScore"`
}
type BaseInfo struct {
    TaskId string `json:"taskId"`
    TaskName string `json:"taskName"`
    LikePer string `json:"likePer"`
    TableId string `json:"tableId"`
    TableName string `json:"tableName"`
    PersonId string `json:"personId"`
    PersonPicUrl string `json:"personPicUrl"`
    PersonName string `json:"personName"`
    Gender string `json:"gender"`
    PhoneNum string `json:"phoneNum"`
    IDCard string `json:"IDCard"`
    MonitorLevel string `json:"monitorLevel"`
    Content string `json:"content"`
    TaskId             string         `json:"taskId"`
    TaskName         string         `json:"taskName"`
    LikePer         string         `json:"likePer"`
    TableId         string         `json:"tableId"`
    TableName         string         `json:"tableName"`
    PersonId         string         `json:"personId"`
    PersonPicUrl     string         `json:"personPicUrl"`
    PersonName         string         `json:"personName"`
    Gender             string         `json:"gender"`
    PhoneNum         string         `json:"phoneNum"`
    IDCard             string         `json:"IDCard"`
    MonitorLevel     string         `json:"monitorLevel"`
    Content         string         `json:"content"`
}
func getSourceBaseInfo(source map[string]interface{}) []BaseInfo {
    sdkType := source["sdkType"].(string)
    baseInfoArr := make([]BaseInfo,0)
    if sdkType == "人脸" {
        likePer,baseName,personId,idCard,personPicUrl,gender,content :="","","","","","",""
        if source["likePer"] !=nil {
            likePer = source["likePer"].(string)
        }
        if source["BaseName"] !=nil {
            baseName = source["BaseName"].(string)
        }
        if source["personId"] !=nil {
            personId = source["personId"].(string)
        }
        if source["idcard"] !=nil {
            idCard = source["idcard"].(string)
        }
        if source["personPicUrl"] !=nil {
            personPicUrl = source["personPicUrl"].(string)
        }
        if source["Gender"] !=nil {
            gender = source["Gender"].(string)
        }
        if source["content"] !=nil {
            content = source["content"].(string)
        }
        var baseInfo = BaseInfo{
            TaskId:"",//2.0新字段
            TaskName:"",//2.0新字段
            LikePer:likePer,
            TableId:"",//2.0新字段
            TableName:baseName,
            PersonId:personId,
            PersonName:idCard,//人员姓名,从管理平台获取
            PersonPicUrl:personPicUrl,
            Gender:gender,
            PhoneNum:"",//手机号,从管理平台获取
            IDCard:idCard,
            MonitorLevel:"",//2.0新字段
            Content:content,
        }
type TI struct {
    BwType           string      `json:"bwType"`
    TargetPicUrl     string         `json:"targetPicUrl"`
    TargetName         string         `json:"targetName"`
    TargetId         string         `json:"targetId"`
    TableId         string         `json:"tableId"`
    CompareScore     string         `json:"compareScore"`
    MonitorLevel     string         `json:"monitorLevel"`
    Content         string         `json:"content"`
    TableName         string         `json:"tableName"`
    Labels             string         `json:"labels"`
}
        baseInfoArr = append(baseInfoArr, baseInfo)
        //bytes, err := json.Marshal(baseInfoArr)
        //if err !=nil {
        //    return ""
        //}
func getSourceBaseInfo(bInfos interface{}) []BaseInfo {
    baseInfoArr := make([]BaseInfo,0)
    b, err := json.Marshal(bInfos)
    if err == nil {
        var targetArr []TI
        if err = json.Unmarshal(b, &targetArr); err == nil && len(targetArr) >0 {
            for _,t := range targetArr {
                idCard,sex := "",""
                if t.Labels != "" {
                    arr := strings.Split(t.Labels, "/")
                    if len(arr) > 0 {
                        for _,str := range arr {
                            if str == "男" || str == "女" {
                                sex = str
                                break
                            }
                        }
                        for _,str := range arr {
                            if len(str) == 18 {
                                idCard = str
                                break
                            }
                        }
                    }
                }
                baseInfoArr = append(baseInfoArr, BaseInfo{
                    TaskId: "",//2.0新字段
                    TaskName: "",//2.0新字段
                    LikePer: t.CompareScore,
                    TableId: "",//2.0新字段
                    TableName: t.TableName,
                    PersonId: t.TargetId,
                    PersonName: t.TargetName,//人员姓名,从管理平台获取
                    PersonPicUrl: t.TargetPicUrl,
                    Gender: sex,
                    PhoneNum: "",//手机号,从管理平台获取
                    IDCard: idCard,
                    MonitorLevel: t.MonitorLevel,//2.0新字段
                    Content: t.Content,
                })
            }
        }
    }
    return baseInfoArr
}
var videoCacheMap = make(map[string]string,0)
var lock sync.RWMutex
func setVideoCache(imgKey string, url string) {
    lock.Lock()
    defer lock.Unlock()
    videoCacheMap[imgKey] = url
}
func getVideoFromCache(imgKey string) string {
    lock.Lock()
    defer lock.Unlock()
    if v,ok := videoCacheMap[imgKey];ok {
        return v
    }
    return ""
}
func getVideoUrl(source map[string]interface{}) (videoUrl string){
    imgKey := source["imgKey"].(string)
    //先从缓存里面取
    cacheUrl := getVideoFromCache(imgKey)
    if cacheUrl !="" {
        return cacheUrl
    }
    picDate := source["picDate"].(string)//抓拍日期
    cameraId := source["videoReqNum"].(string)//摄像机id
    indeviceId := source["indeviceid"].(string)//分析设备id
    reqUrl := ""
    if url,ok := config.ServerMap[indeviceId];!ok {
        return ""
    } else {
        reqUrl = url
    }
    log.Log.Infoln("reqUrl:",reqUrl)
    paramMap := make(map[string]interface{},0)
    paramMap["imgKey"] = imgKey
    paramMap["picDate"] = picDate
    paramMap["videoNum"] = cameraId
    respBytes, err := doPostRequest(reqUrl, "application/json", paramMap, nil, nil)
    if err !=nil{
        return ""
    }
    var resp RespVideo
    err = json.Unmarshal(respBytes, &resp)
    if err !=nil {
        return ""
    }
    filePath := resp.FilePath
    videoUrl = ""
    if !strings.Contains(filePath, "/cut"){
        videoUrl = ""
        fmt.Printf("videoReqUrl:%s ,imgKey:%s ,picDate:%s ,cameraId:%s ,filePath:%s \n ",reqUrl,imgKey,picDate,cameraId,filePath)
    } else {
        strArr := strings.Split(filePath, "/cut")
        ngxUrl := config.NgxMap[indeviceId]
        log.Log.Infoln("ngxUrl:",ngxUrl)
        if ngxUrl !="" && len(strArr) >0 {
            videoUrl = ngxUrl + strArr[1]
        }
    }
    if videoUrl == ""{
        respMap := make(map[string]interface{},0)
        err := json.Unmarshal(respBytes, &respMap)
        if err !=nil {
            fmt.Println("resp UnmarshalToMap err: ",err)
        }
        fmt.Println("resp: ",respMap)
    }
    setVideoCache(imgKey, videoUrl)
    return videoUrl
}
type RespVideo struct{
    FilePath string `json:"file_path"`
}
func doPostRequest(url string, contentType string, body map[string]interface{}, params map[string]string, headers map[string]string) ([]byte, error) {
    var resultBytes []byte
    var bodyJson []byte
    if body != nil {
        var err error
        bodyJson, err = json.Marshal(body)
        if err != nil {
            log.Log.Errorln("doPostRequestMarshal err:",err)
            return resultBytes, err
        }
    }
    request, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyJson))
    if err != nil {
        log.Log.Errorln("NewRequest ERR:",err)
        return resultBytes, err
    }
    request.Header.Set("Content-type", contentType)
    //add params
    q := request.URL.Query()
    if params != nil {
        for key, val := range params {
            q.Add(key, val)
        }
        request.URL.RawQuery = q.Encode()
    }
    // add headers
    if headers != nil {
        for key, val := range headers {
            request.Header.Add(key, val)
        }
    }
    timeOut:= time.Duration(8*time.Second)//set request timeout
    client := &http.Client{
        Timeout:timeOut,
    }
    resp, err := client.Do(request)
    if err != nil {
        log.Log.Errorln("DoRequest ERR:",err)
        return resultBytes, err
    }
    defer resp.Body.Close()
    resultBytes, err = ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Log.Errorln("ReadAll ERR:",err)
        return resultBytes, err
    }
    return resultBytes, nil
}
//sdk类型
func sdkTypeToValue(i int) string {
    value := []string{"人脸", "车辆", "人体", "入侵", "拥挤", "靠右行", "人员异常", "个体静止"}
    return value[i-1]
}
func PostAction(sec int, Eurl string, picurl string, ishub string, size int, lastT time.Time, curTime time.Time) []byte {
    //lastTimeStr := lastT.Format("2006-01-02 15:04:05")
    //curTimeStr := curTime.Format("2006-01-02 15:04:05")
    index := "videopersons,personaction"
func PostAction(sec int, Eurl string, ishub string, size int, lastT time.Time, curTime time.Time) []byte {
    index := "ai_ocean"
    url := fmt.Sprintf("%s%s%s", Eurl, index, "/_search")
    sizeStr :=""
@@ -379,8 +215,9 @@
        preSec = strconv.Itoa(sec)
    }
    var filterArr []string
    //是否查报警数据
    if ishub == "hub" {
        filterArr = append(filterArr,"{\"term\":{\"personIsHub\":\"1\"}}")
        filterArr = append(filterArr,"{\"term\":{\"alarmRules.alarmLevel.raw\":\"一级\"}}")
    }
    filterArr = append(filterArr, "{\"range\":{\"picDate\":{\"gte\":\"now+8h-"+preSec+"s\",\"lt\":\"now+8h\"}}}")
@@ -389,11 +226,22 @@
    if len(filterArr) >0 {
        filterStr = strings.Join(filterArr, ",")
    }
    sourceArr := []string{
        "baseInfo",
        "targetInfo",
        "content",
        "id",
        "picMaxUrl",
        "picDate",
        "showLabels",
        "taskName",
    }
    sourceStr := strings.Join(sourceArr, ",")
    log.Log.Infoln("filterArr:", filterStr)
    prama := "{\"query\":{\"bool\":{\"filter\":["+filterStr+"]}},\"size\":\""+sizeStr+"\",\"sort\":[{\"picDate\":{\"order\":\"desc\"}}]," +
        "\"_source\":[\"baseInfo\",\"Gender\",\"BaseName\",\"Age\",\"personId\",\"personPicUrl\",\"indeviceName\",\"imgKey\",\"sdkType\",\"ageDescription\",\"indeviceid\",\"content\",\"Id\",\"picAddress\",\"picMaxUrl\",\"picDate\",\"Race\",\"videoNum\",\"picSmUrl\",\"taskName\",\"personIsHub\",\"idcard\",\"videoIp\",\"videoReqNum\"]" +
    param := "{\"query\":{\"bool\":{\"filter\":["+filterStr+"]}},\"size\":\""+sizeStr+"\",\"sort\":[{\"picDate\":{\"order\":\"desc\"}}]," +
        "\"_source\":{\"includes\":["+sourceStr+"],\"excludes\":[\"*.feature\",\"*.attachTarget\",\"*.targetLocation\",\"alarmRules\"]}" +
        "}"
    err, tokenRes := GetEsDataReq(url, prama, picurl, true)
    err, tokenRes := GetEsDataReq(url, param, true)
    if err != nil {
        log.Log.Errorln(err)
server.go
@@ -3,7 +3,6 @@
import (
    "flag"
    "fmt"
    "net"
    "strconv"
    "time"
@@ -14,8 +13,8 @@
)
var addr = flag.String("addr", "0.0.0.0", "The address to listen to")
var Eurl = flag.String("eurl", "http://192.168.1.182:9200/", "The port to listen on")
var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url")
var Eurl = flag.String("eurl", "http://172.16.50.241:9200/", "The port to listen on")
//var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url")
var port = flag.Int("port", 6000, "The port to listen on")
var sec = flag.Int("sec", 10, "the second for query data")
@@ -36,28 +35,6 @@
    log.SetLogLevel(*Level)
    config.Init(*env, *confPath)
    fmt.Println(*port)
    //src := *addr + ":" + strconv.Itoa(*port)
    //listener, err := net.Listen("tcp", src)
    //if err != nil {
    //    log.Log.Errorln(err)
    //    return
    //}
    //log.Log.Infof("Listening on %s.\n", src)
    //fmt.Println("starting server success.")
    //defer listener.Close()
    //connArr:=make([]net.Conn,0)
    //for {
    //    conn, err := listener.Accept()//
    //
    //    connArr = append(connArr,conn)
    //    if err != nil {
    //        log.Log.Infoln("some connecion error: ", err)
    //    }
    //    go handleConnection(conn,connArr)
    //}
    mqAddr := "amqp://" + *mqUser + ":" + *mqPass + "@" + *mqIp + ":" + strconv.Itoa(*mqPort)+"/"
@@ -97,7 +74,7 @@
        select {
        case <-tick.C:
            curTime := time.Now()
            alarmData := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size, lastTime, curTime)
            alarmData := esutil.PostAction(*sec, *Eurl, *IsHub, *Size, lastTime, curTime)
            if alarmData != nil {
                err := ch.Publish(
                    "",
@@ -117,59 +94,4 @@
            lastTime = curTime
        }
    }
}
func handleConnection(conn net.Conn, connArr []net.Conn) {
    remoteAddr := conn.RemoteAddr().String()
    log.Log.Infoln("Client connected from ", remoteAddr)
    ech := make(chan error)
    go func(conn net.Conn, ech chan error) {
        buf := make([]byte, 10)
        readMsg, err := conn.Read(buf)
        log.Log.Infoln("Read completed,readMsg:",readMsg,",err:",err)
        ech <- err
    }(conn, ech)
    tick := time.NewTicker(3 * time.Second)
    lastTime := time.Now()
    for {
        select {
        case <-tick.C:
            curTime := time.Now()
            if !handleMessage(conn, connArr, lastTime, curTime){
                conn.Close()
                return
            }
            lastTime = curTime
        case err := <-ech:
            log.Log.Infoln(err, "remoteAddr ", remoteAddr, " close")
            conn.Close()
            return
        }
    }
    log.Log.Infoln("Client at " + remoteAddr + " disconnected.")
}
func handleMessage(conn net.Conn, connArr []net.Conn,lastT time.Time, curTime time.Time) bool {
    jsonstring := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size, lastT, curTime)
    if jsonstring == nil {
        log.Log.Infoln("the data is nil,remoteArr:",conn.RemoteAddr())
        if _, err := conn.Write([]byte("\000"));err !=nil {
            log.Log.Infoln("conn.WriteErr:",err)
            return false
        } else {
            return true
        }
    }
    jsonstring = append(jsonstring, []byte("\000")...)
    log.Log.Infoln("jsonstring len: ", len(jsonstring), "\000 data: ", len("\000"))
    _, err := conn.Write(jsonstring)
    if err !=nil{
        log.Log.Infoln("conn.WriteErr:",err)
        return false
    }
    return true
}
}