tcp server 用于给andriod 客户端定时发送消息
liuxiaolong
2019-11-01 a9cd9edbbca168e74a434b5d3c714de58488add3
set size for get data,cache videoUrl
2个文件已添加
2个文件已修改
156 ■■■■ 已修改文件
config/config.go 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.yaml 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
esutil/EsClient.go 76 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.go 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go
New file
@@ -0,0 +1,38 @@
package config
import (
    "log"
    "strconv"
    "github.com/spf13/viper"
)
type server []ServerInfo
type ServerInfo struct {
    ServerId string `mapstructure: "serverId"`
    ServerIp string `mapstructure: "serverIp"`
    VideoPort int   `mapstructure: "videoPort"`
    NginxIp string  `mapstructure: "ngxIp"`
    NginxPort int   `mapstructure: "ngxPort"`
}
var serverList = &server{}
var ServerMap = make(map[string]string,0)
var NgxMap = make(map[string]string,0)
func Init(env string) {
    var err error
    viper.SetConfigType("yaml")
    viper.SetConfigName(env)
    viper.AddConfigPath("./config/")
    err = viper.ReadInConfig()
    if err != nil {
        log.Fatal("error on parsing configuration file")
    }
    viper.UnmarshalKey("server", serverList)
    for _,s :=range *serverList {
        ServerMap[s.ServerId] = "http://"+s.ServerIp+":"+strconv.Itoa(s.VideoPort)+"/getRecordVideoPath"
        NgxMap[s.ServerId] = "http://"+s.NginxIp+":"+strconv.Itoa(s.NginxPort)+"/videosource"
    }
}
config/config.yaml
New file
@@ -0,0 +1,25 @@
server:
-
  serverId: DSVAD010120181119
  serverIp: 172.17.50.241
  videoPort: 11111
  ngxIp: 58.118.225.79
  ngxPort: 44180
-
  serverId: DSVAD010220181119
  serverIp: 172.17.50.242
  videoPort: 11111
  ngxIp: 58.118.225.79
  ngxPort: 44280
-
  serverId: DSVAD010320181119
  serverIp: 172.17.50.243
  videoPort: 11111
  ngxIp: 58.118.225.79
  ngxPort: 44380
-
  serverId: DSVAD010420181119
  serverIp: 172.17.50.244
  videoPort: 11111
  ngxIp: 58.118.225.79
  ngxPort: 44480
esutil/EsClient.go
@@ -10,7 +10,9 @@
    "net/http"
    "strconv"
    "strings"
    "sync"
    "time"
    "github.com/long/config"
    log "github.com/long/test/log"
)
@@ -219,25 +221,44 @@
    return baseInfoArr
}
var videoCacheMap = make(map[string]string,0)
var lock sync.RWMutex
func setVideoCache(imgKey string, url string) {
    lock.Lock()
    defer lock.Unlock()
    videoCacheMap[imgKey] = url
}
func getVideoFromCache(imgKey string) string {
    lock.Lock()
    defer lock.Unlock()
    if v,ok := videoCacheMap[imgKey];ok {
        return v
    }
    return ""
}
func getVideoUrl(source map[string]interface{}) (videoUrl string){
    imgKey := source["imgKey"].(string)
    //先从缓存里面取
    cacheUrl := getVideoFromCache(imgKey)
    if cacheUrl !="" {
        return cacheUrl
    }
    picDate := source["picDate"].(string)//抓拍日期
    cameraId := source["videoReqNum"].(string)//摄像机id
    indeviceId := source["indeviceid"].(string)//分析设备id
    deviceMap := make(map[string]string,0)
    deviceMap["DSVAD010120181119"] = "http://172.17.50.241:11111/getRecordVideoPath"
    deviceMap["DSVAD010220181119"] = "http://172.17.50.242:11111/getRecordVideoPath"
    deviceMap["DSVAD010320181119"] = "http://172.17.50.243:11111/getRecordVideoPath"
    deviceMap["DSVAD010420181119"] = "http://172.17.50.244:11111/getRecordVideoPath"
    ngxMap := make(map[string]string,0)
    ngxMap["DSVAD010120181119"] = "http://58.118.225.79:44180/videosource"
    ngxMap["DSVAD010220181119"] = "http://58.118.225.79:44280/videosource"
    ngxMap["DSVAD010320181119"] = "http://58.118.225.79:44380/videosource"
    ngxMap["DSVAD010420181119"] = "http://58.118.225.79:44480/videosource"
    reqUrl := ""
    if url,ok := config.ServerMap[indeviceId];!ok {
        return ""
    } else {
        reqUrl = url
    }
    reqUrl := deviceMap[indeviceId]
    paramMap := make(map[string]interface{},0)
    paramMap["imgKey"] = imgKey
    paramMap["picDate"] = picDate
@@ -261,7 +282,7 @@
        fmt.Printf("videoReqUrl:%s ,imgKey:%s ,picDate:%s ,cameraId:%s ,filePath:%s \n ",reqUrl,imgKey,picDate,cameraId,filePath)
    } else {
        strArr := strings.Split(filePath, "/cut")
        ngxUrl := ngxMap[indeviceId]
        ngxUrl := config.NgxMap[indeviceId]
        if ngxUrl !="" && len(strArr) >0 {
            videoUrl = ngxUrl + strArr[1]
        }
@@ -274,6 +295,7 @@
        }
        fmt.Println("resp: ",respMap)
    }
    setVideoCache(imgKey, videoUrl)
    return videoUrl
}
@@ -336,24 +358,38 @@
    return value[i-1]
}
func PostAction(sec int, Eurl string, picurl string, ishub string) []byte {
func PostAction(sec int, Eurl string, picurl string, ishub string, size int) []byte {
    index := "videopersons,personaction"
    url := fmt.Sprintf("%s%s%s", Eurl, index, "/_search")
    startTime := time.Now()
    seccond := strconv.Itoa(sec)
    preSec := ""
    if sec > 60 {
        preSec = "-60s"
    }
    var ishubReq =""
    if ishub == "hub" {
        ishubReq = "{\"term\":{\"personIsHub\":\"1\"}},"
    sizeStr :=""
    if size <=0 {
        sizeStr = "100"
    } else {
        sizeStr = strconv.Itoa(size)
    }
    prama := "{\"query\":{\"bool\":{\"filter\":["+ishubReq+"{\"range\":{\"picDate\":{\"gte\":\"now+8h-" + seccond + "s\",\"lt\":\"now+8h"+preSec+"\"}}}]}},\"size\":\"1000\",\"sort\":[{\"picDate\":{\"order\":\"desc\"}}]," +
    var filterArr []string
    if ishub == "hub" {
        filterArr = append(filterArr,"{\"term\":{\"personIsHub\":\"1\"}}")
    }
    if sec > 60 {
        filterArr = append(filterArr, "{\"range\":{\"picDate\":{\"gte\":\"now+8h-" + seccond + "s\",\"lt\":\"now+8h"+preSec+"\"}}}")
    }
    filterStr := ""
    if len(filterArr) >0 {
        filterStr = strings.Join(filterArr, ",")
    }
    prama := "{\"query\":{\"bool\":{\"filter\":["+filterStr+"]}},\"size\":\""+sizeStr+"\",\"sort\":[{\"picDate\":{\"order\":\"desc\"}}]," +
        "\"_source\":[\"baseInfo\",\"Gender\",\"BaseName\",\"Age\",\"personId\",\"personPicUrl\",\"indeviceName\",\"imgKey\",\"sdkType\",\"ageDescription\",\"indeviceid\",\"content\",\"Id\",\"picAddress\",\"picMaxUrl\",\"picDate\",\"Race\",\"videoNum\",\"picSmUrl\",\"taskName\",\"personIsHub\",\"idcard\",\"videoIp\",\"videoReqNum\"]" +
        "}"
    err, tokenRes := GetEsDataReq(url, prama, picurl, true)
    log.Log.InfoInfoln("条数:",size,"耗时:",time.Since(startTime))
    if err != nil {
        log.Log.Errorln(err)
        return nil
server.go
@@ -8,24 +8,27 @@
    "time"
    "github.com/long/test/esutil"
    "github.com/long/config"
    log "github.com/long/test/log"
)
var addr = flag.String("addr", "0.0.0.0", "The address to listen to;")
var Eurl = flag.String("eurl", "http://192.168.1.182:9200/", "The port to listen on; ")
var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url ")
var addr = flag.String("addr", "0.0.0.0", "The address to listen to")
var Eurl = flag.String("eurl", "http://192.168.1.182:9200/", "The port to listen on")
var Picurl = flag.String("picurl", "http://58.118.225.79:41242/", "picture url")
var port = flag.Int("port", 6000, "The port to listen on; ")
var sec = flag.Int("sec", 10, "the second for query data. ")
var port = flag.Int("port", 6000, "The port to listen on")
var sec = flag.Int("sec", 10, "the second for query data")
var Level = flag.String("level","ErrorLevel","log level")
var IsHub = flag.String("hub", "hub", "hub is personIsHub=1")
var Size = flag.Int("size", 100, "size default is 100")
var env = flag.String("env", "config", "env set")
func main() {
    flag.Parse()
    log.SetLogLevel(*Level)
    config.Init(*env)
    fmt.Println(*port)
    src := *addr + ":" + strconv.Itoa(*port)
    listener, err := net.Listen("tcp", src)
@@ -84,7 +87,7 @@
}
func handleMessage(conn net.Conn, connArr []net.Conn) bool {
    jsonstring := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub)
    jsonstring := esutil.PostAction(*sec, *Eurl, *Picurl, *IsHub, *Size)
    if jsonstring == nil {
        log.Log.Infoln("the data is nil,remoteArr:",conn.RemoteAddr())
        if _, err := conn.Write([]byte("\000"));err !=nil {