longganhua
2019-05-09 66198ce78d2c81ef6e962bccae23ae0ecbf7acd2
release first version
10个文件已添加
663 ■■■■■ 已修改文件
camera/camera.go 198 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
httpclient/es_test.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
httpclient/esutil.go 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 225 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
New file
@@ -0,0 +1,198 @@
package camera
import (
    "github.com/long/test/httpclient"
    "github.com/long/test/sdk"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pair"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "context"
    "encoding/json"
    "fmt"
    "time"
)
/*
* 1.  获取 cid
* 2.  获取 cid和 taskid关系
* 3.  获取 cid ipc communication
 */
var SocketManage = make(map[string]SocketContext)
var Initchannel = make(chan Camerdata)
var UrlPort = 7000
type Camerdata struct {
    Cameraid string
    Rtsp     string
}
type SocketContext struct {
    Sock    mangos.Socket
    Context context.Context
    Cancel  context.CancelFunc
}
func Taskdolist(cid string, taskid string, data []byte) {
    //  数据加工(打标签)
    sdkmsg := sdk.SdkData(cid, taskid, data)
    fmt.Println("============================")
    fmt.Println("sdk  打标签: ", cid, taskid, len(data))
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
    if _, ok := sdk.SdkMap[SendTopic]; ok {
        fmt.Println("分发的主题存在")
        sdk.SdkMap[SendTopic] <- sdkmsg
        //fmt.Println("重新开始循环: ", sdk.SdkMap)
    } else {
        fmt.Println("分发的主题不存在")
    }
}
func Init() {
    CameraRelative()
    fmt.Println()
    fmt.Println("cid,taskid , sid: ", CtsId)
    _, socket, err := NewCamerSocketListen("init", "tcp", "192.168.1.124", 0)
    if err != nil {
        return
    }
    go SendRecv(socket, Initchannel)
    var cameratodecode Camerdata
    for {
        time.Sleep(2 * time.Second)
        i := 0
        for _, cam := range CtsId {
            if i < 2 {
                cameratodecode.Cameraid = cam.Cameraid
                cameratodecode.Rtsp = cam.RtspUrl
                Initchannel <- cameratodecode
            }
            i++
        }
    }
}
func Recv(cameraid string, socket SocketContext) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    var msg []byte
    var err error
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("listen recv quit")
            return
        default:
            if msg, err = socket.Sock.Recv(); err != nil {
                //fmt.Printf("%s ", err)
                continue
            } else {
                for _, taskid := range GetAlltask(cameraid) {
                    Taskdolist(cameraid, taskid, msg)
                    fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid)
                }
            }
        }
    }
}
func send(socket SocketContext, cam Camerdata) {
    b, err := json.Marshal(cam)
    if err != nil {
        fmt.Println("can not json convert !", cam)
    }
    if err := socket.Sock.Send(b); err != nil {
        fmt.Println("failed send")
    }
}
func SendRecv(socket SocketContext, camera chan Camerdata) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    for cam := range camera {
        send(socket, cam)
        if _, ok := SocketManage[cam.Cameraid]; !ok {
            go func(id string) {
                fmt.Println("create cid server: ", id)
                cid, socketlisten, _ := NewCamerSocketListen(id, "tcp", "192.168.1.124", 0)
                fmt.Println("input id: ", id, " output id :", cid)
                Recv(cid, socketlisten)
            }(cam.Cameraid)
        }
    }
}
// new camera server
func NewCamerSocketListen(cameraid string, protocol string, ip string, port int) (cid string, socket SocketContext, err error) {
    ctx, cancel := context.WithCancel(context.Background())
    var url string
    socket.Context = ctx
    socket.Cancel = cancel
    switch protocol {
    case "tcp":
        if port == 0 {
            port = UrlPort
            UrlPort++
        }
        url = fmt.Sprintf("%s://%s:%d", protocol, ip, port)
    case "ipc":
        url = fmt.Sprintf("%s://%s", cameraid)
    }
    fmt.Println(url)
    if socket.Sock, err = pair.NewSocket(); err != nil {
        fmt.Println(cameraid, "can't get new pair socket: ", err.Error())
        return cameraid, socket, err
    }
    socket.Sock.AddTransport(tcp.NewTransport())
    socket.Sock.AddTransport(ipc.NewTransport())
    socket.Sock.SetOption(mangos.OptionMaxRecvSize, 32*1024*1024)
    socket.Sock.SetOption(mangos.OptionWriteQLen, 10)
    socket.Sock.SetOption(mangos.OptionReadQLen, 10)
    if err = socket.Sock.Listen(url); err != nil {
        fmt.Println("socket lisnte error ", cameraid)
    }
    fmt.Println(cameraid, "success")
    SocketManage[cameraid] = socket
    return cameraid, socket, err
}
// 获取 cid , taskid, sdkid 关系
var CtsId []httpclient.Camerasingle
func CameraRelative() {
    CtsId = httpclient.GetEsDataReq("http://127.0.0.1:8000/data/api-v/camera/queryCameraAndTaskInfo")
}
//   根据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
    for _, camsingle := range CtsId {
        if cid == camsingle.Cameraid {
            for _, tasksingle := range camsingle.TaskList {
                tasks = append(tasks, tasksingle.Taskid)
            }
            return
        }
    }
    return
}
go.mod
New file
@@ -0,0 +1,9 @@
module github.com/long/test
go 1.12
require (
    github.com/Microsoft/go-winio v0.4.12 // indirect
    golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
    nanomsg.org/go-mangos v1.4.0
)
go.sum
New file
@@ -0,0 +1,6 @@
github.com/Microsoft/go-winio v0.4.12 h1:xAfWHN1IrQ0NJ9TBC0KBZoqLjzDTr1ML+4MywiUOryc=
github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
httpclient/es_test.go
New file
@@ -0,0 +1,34 @@
package httpclient
import (
    "testing"
)
//func TestGetEsDataReq(t *testing.T) {
//    var queryparam = `{
//                    "query":{
//                        "match_all":{}
//                            },
//                                "sort":[{"picDate":{"order":"desc"}}],
//                                    "size":"10",
//                                        "_source":["Id","FaceFeature","picDate"]
//                                    }`
//
//    //testmap := GetEsDataReq("http://192.168.1.182:9200/videopersons/_search", queryparam, true)
//    //    t.Log(testmap)
//    GetEsDataReq("http://192.168.1.182:9200/videopersons/_search", queryparam, true)
//    t.Log("success")
//
//}
func TestGetEsDataReq(t *testing.T) {
    GetEsDataReq("http://192.168.1.133:8000/data/api-v/camera/queryCameraAndTaskInfo")
    t.Log("success")
}
func TestGetSdk(t *testing.T) {
    sdks := GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid")
    t.Log(sdks)
}
httpclient/esutil.go
New file
@@ -0,0 +1,79 @@
package httpclient
import (
    "encoding/json"
    "fmt"
    "net/http"
)
type CamerSdkStruct struct {
    Sdkid   string `json:"sdkId"`
    Sdkname string `json:"sdkName"`
    Sdkargs string `json:"sdkArgs`
}
type Tasksingle struct {
    Taskid  string           `json:"taskid"`
    SdkList []CamerSdkStruct `json:"sdklist"`
}
type Camerasingle struct {
    Cameraid string       `json:"cameraid"`
    RtspUrl  string       `json:"rtspUrl"`
    TaskList []Tasksingle `json:tasklist`
}
//  get relative of camera, task and sdk
func GetEsDataReq(url string, parma ...string) []Camerasingle {
    fmt.Println("查询请求路径" + url) //  配置信息 获取
    client := &http.Client{}
    req, err := http.NewRequest("GET", url, nil)
    req.Header.Add("Content-Type", "application/json")
    resp, err := client.Do(req)
    if err != nil {
        fmt.Println(err)
    }
    defer resp.Body.Close()
    var responsedata []Camerasingle
    err = json.NewDecoder(resp.Body).Decode(&responsedata)
    if err != nil {
        fmt.Println(err)
    }
    return responsedata
}
//   获取所有的算法id
type sdkInfo struct {
    SdkId string `json:"SdkId"`
}
func GetSdk(url string) []string {
    var sdkslice []string
    fmt.Println("查询请求路径" + url) //  配置信息 获取
    client := &http.Client{}
    req, err := http.NewRequest("GET", url, nil)
    req.Header.Add("Content-Type", "application/json")
    resp, err := client.Do(req)
    if err != nil {
        fmt.Println(err)
    }
    defer resp.Body.Close()
    var sdkIds []sdkInfo
    err = json.NewDecoder(resp.Body).Decode(&sdkIds)
    if err != nil {
        fmt.Println(err)
    }
    for _, sdksingle := range sdkIds {
        sdkslice = append(sdkslice, sdksingle.SdkId)
    }
    return sdkslice
}
main.go
New file
@@ -0,0 +1,34 @@
package main
import (
    "fmt"
    "log"
    "net/http"
    _ "net/http/pprof"
    "github.com/long/test/camera"
    //    "github.com/long/test/httpclient"
    "github.com/long/test/sdk"
    "github.com/long/test/tasktag"
    //"time"
)
func main() {
    // pprof 用于分析性能
    go func() {
        log.Println(http.ListenAndServe("192.168.1.124:6060", nil))
    }()
    sdk.Init()     //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签
    camera.Init()  //获取cid, taskid, sdkid ,关系
    fmt.Println()
    fmt.Println("===================================")
    fmt.Println()
    for {
    }
}
sdk/sdk.go
New file
@@ -0,0 +1,225 @@
package sdk
import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/long/test/httpclient"
    "github.com/long/test/tasktag"
    "github.com/long/test/util"
    "time"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pair"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
var SocketManage = make(map[string]SocketContext)
type SocketContext struct {
    Sock    mangos.Socket
    Context context.Context
    Cancel  context.CancelFunc
}
func Init() {
    sdklist := SdkAll() //获取所有sdk
    fmt.Println("sdk list have: ", sdklist)
    SdkCreateTopic(sdklist) // 创建主题
    for _, sdkid := range sdklist { // 创建sdk server
        sdkid, socket, err := NewSdkListen(sdkid, "tcp", "192.168.1.124", 0)
        if err != nil {
            continue
        }
        // 接受管道数据 ==》 发送给 对应的进程
        go send(sdkid, socket, SdkMap[sdkid])
        //从对应进程接受数据 == 》 重新送回到管道
        go Recv(socket)
    }
    go es(SdkMap["es"])
}
//单独处理   es 主题的情况
func es(sdkmsgchan chan SdkMessage) {
    for data := range sdkmsgchan {
        fmt.Println("this data is finish all sdk! ", data)
    }
}
//动态处理
func AutoDelSdk(Newsdklist []string) {
    var oldSdk []string
    for key, _ := range SdkMap {
        oldSdk = append(oldSdk, key)
    }
    sdkChanDel := util.Difference(oldSdk, Newsdklist)
    for key, op := range sdkChanDel {
        if op == "add" {
            SdkMap[key] = make(chan SdkMessage)
            fmt.Println("创建主题 sdk: ", key)
        } else {
            close(SdkMap[key])
            delete(SdkMap, key)
            fmt.Println("删除主题 sdk: ", key)
        }
    }
}
//主题
var SdkMap = make(map[string]chan SdkMessage)
// 发送给算法进程的结构
type SdkMessage struct {
    Cid     string
    Tasklab tasktag.TaskLabel
    Data    []byte
}
//sdk数据 加工器
func SdkData(cid string, taskid string, data []byte) (sdkmsg SdkMessage) {
    sdkmsg.Cid = cid
    sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
    sdkmsg.Data = data
    return
}
//sdk数据分发器
func SdkSendTopic(sdkmsg SdkMessage) (sdksend string) {
    if sdkmsg.Tasklab.Index < len(sdkmsg.Tasklab.Sdkids) {
        sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
    } else {
        sdksend = "es"
    }
    fmt.Println()
    fmt.Println("分发的主题是: ", sdksend)
    fmt.Println()
    return
}
// 调用  http 借口获取摄像机信息
func SdkAll() (sdklist []string) {
    sdklist = httpclient.GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid")
    return
}
// 创建主题
func SdkCreateTopic(sdklist []string) (err error) {
    for _, sdkid := range sdklist {
        SdkMap[sdkid] = make(chan SdkMessage)
        fmt.Println("create sdk channel:  ", sdkid)
    }
    SdkMap["es"] = make(chan SdkMessage)
    fmt.Println("create es channel:  ")
    return nil
}
var UrlPort = 9000
// 创建 sdk server  listen
func NewSdkListen(sdkid string, protocol string, ip string, port int) (sid string, socket SocketContext, err error) {
    ctx, cancel := context.WithCancel(context.Background())
    var url string
    socket.Context = ctx
    socket.Cancel = cancel
    switch protocol {
    case "tcp":
        if port == 0 {
            port = UrlPort
            UrlPort++
        }
        url = fmt.Sprintf("%s://%s:%d", protocol, ip, port)
    case "ipc":
        url = fmt.Sprintf("%s://%s", sdkid)
    }
    fmt.Printf("sdkid= %s  url=%s\n", sdkid, url)
    if socket.Sock, err = pair.NewSocket(); err != nil {
        fmt.Println(sdkid, "can't get new pair socket: ", err.Error())
        return sdkid, socket, err
    }
    socket.Sock.SetOption(mangos.OptionMaxRecvSize, 32*1024*1024)
    socket.Sock.SetOption(mangos.OptionWriteQLen, 10)
    socket.Sock.SetOption(mangos.OptionReadQLen, 10)
    socket.Sock.AddTransport(tcp.NewTransport())
    socket.Sock.AddTransport(ipc.NewTransport())
    if err = socket.Sock.Listen(url); err != nil {
        fmt.Println("socket lisnte error ", sdkid)
    }
    SocketManage[sdkid] = socket
    return sdkid, socket, err
}
func Recv(socket SocketContext) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second)
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("socket close")
            return
        default:
            if msg, err := socket.Sock.Recv(); err != nil {
                //fmt.Printf("%s ", err)
                continue
            } else {
                var repsdkmsg SdkMessage
                var reps interface{}
                err = json.Unmarshal(msg, &reps)
                if err != nil {
                    continue
                }
                switch v := reps.(type) {
                case map[string]interface{}:
                    //调用计算函数, 分发给下一个主题
                    json.Unmarshal(msg, &repsdkmsg)
                    nexttopic := SdkSendTopic(repsdkmsg)
                    SdkMap[nexttopic] <- repsdkmsg
                case string:
                    fmt.Println("this string is: ", v)
                }
            }
        }
    }
}
func send(sdkid string, socket SocketContext, in chan SdkMessage) {
    var v SdkMessage
    var b []byte
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("socket is close")
        case v = <-in:
            b, _ = json.Marshal(v)
            fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(v.Data))
            if err := socket.Sock.Send(b); err != nil {
                fmt.Println("failed send")
            }
            fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(v.Data))
        }
    }
}
tasktag/tasktag.go
New file
@@ -0,0 +1,45 @@
package tasktag
import (
    "fmt"
)
/* 任务标签生成器
 *  描述: 用于在数据进入到特定的task时, 给这个数据加上任务标签,这样就能知道算法流程了
 */
type TaskLabel struct {
    Taskid string
    Sdkids []string
    Index  int
}
var TaskMapLab = make(map[string]TaskLabel)
//
func Init() {
    var tls []TaskLabel
    sdk1 := "812b674b-2375-4589-919a-5c1c3278a972"
    sdk2 := "812b674b-2375-4589-919a-5c1c3278a971"
    task1 := TaskLabel{"5b0902ae-b1bd-43c0-816d-0a87f1f859d1", []string{sdk1, sdk2}, 0}
    tls = append(tls, task1)
    task2 := TaskLabel{"5b0902ae-b1bd-43c0-816d-0a87f1f859d2", []string{sdk2}, 0}
    tls = append(tls, task2)
    GenTasklab(tls)
    for key, value := range TaskMapLab {
        fmt.Println()
        fmt.Println(key, value)
    }
}
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func GenTasklab(tasklab []TaskLabel) {
    for _, value := range tasklab {
        TaskMapLab[value.Taskid] = value
    }
}
test
Binary files differ
util/util.go
New file
@@ -0,0 +1,33 @@
package util
//  1. oldstring element is not in new  : abandon(delete)
//  2. new element is not in oldstring  : add(add)
func Difference(oldstring []string, newstring []string) map[string]string {
    var diff = make(map[string]string)
    // Loop two times, first to find oldstring strings not in newstring,
    // second loop to find newstring strings not in oldstring
    for i := 0; i < 2; i++ {
        for _, s1 := range oldstring {
            found := false
            for _, s2 := range newstring {
                if s1 == s2 {
                    found = true
                    break
                }
            }
            // String not found. We add it to return slice
            if !found && i == 0 {
                diff[s1] = "delete"
            }
            if !found && i != 0 {
                diff[s1] = "add"
            }
        }
        // Swap the slices, only if it was the first loop
        if i == 0 {
            oldstring, newstring = newstring, oldstring
        }
    }
    return diff
}