554325746@qq.com
2019-05-28 70df7b912014201e271a2966599b84d77addd0f9
test all process
7个文件已修改
4个文件已删除
1150 ■■■■ 已修改文件
camera/camera.go 260 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
protomsg/.gitignore 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
protomsg/test.pb.go 738 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
protomsg/test.proto 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -1,183 +1,141 @@
package camera
import (
    "errors"
        "errors"
    "github.com/long/test/deliver"
    "github.com/long/test/httpclient"
    "github.com/long/test/sdk"
        "basic.com/dbapi.git"
        "basic.com/valib/deliver.git"
        "basic.com/pubsub/protomsg.git"
    "context"
    "encoding/json"
    "fmt"
    "time"
)
        "github.com/long/test/sdk"
/*
* 1.  获取 cid
* 2.  获取 cid和 taskid关系
* 3.  获取 cid ipc communication
 */
var SocketManage = make(map[string]SocketContext)
var Initchannel = make(chan Camerdata)
var UrlPort = 7000
        "context"
        "fmt"
        "sync"
        "time"
       )
type Camerdata struct {
    Cameraid string
    Rtsp     string
}
//var SocketManage = make(map[string]SocketContext)
var SocketManage sync.Map
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
}
var Initchannel = make(chan string)
func Taskdolist(cid string, taskid string, data []byte) {
    type SocketContext struct {
        Sock    deliver.Deliver
            Context context.Context
            Cancel  context.CancelFunc
    }
    fmt.Println("======================================")
    //  数据加工(打标签)
    sdkmsg := sdk.SdkData(cid, taskid, data)
    if sdkmsg.Tasklab == nil {
        fmt.Println("cid:%s 没有任务%s", cid, taskid)
        return
    }
var camval dbapi.CameraApi
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
    if _, ok := sdk.SdkMap[SendTopic]; ok {
        fmt.Println("分发的主题存在")
        sdk.SdkMap[SendTopic] <- sdkmsg
        //fmt.Println("重新开始循环: ", sdk.SdkMap)
    } else {
        fmt.Println("分发的主题不存在")
    }
}
//get camera with task
var cameraTasks []protomsg.CameraAndTaskInfo
func Init() {
    CameraRelative()
    fmt.Println()
    fmt.Println("cid,taskid , sid: ", CtsId)
    fmt.Println("============ camera info ====================")
    ctsid := camval.FindAll()
    url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort)
    _, socket, err := NewCamerSocketListen(deliver.Pair, "init", url)
    if err != nil {
        return
    }
    UrlPort++
    fmt.Println("==============camera camera with task ================")
    cameraTasks=camval.FindAllCameraAndTask()
    go SendRecv(socket, Initchannel)
    go CreateCamera(Initchannel)
    var cameratodecode Camerdata
    for {
        time.Sleep(2 * time.Second)
        i := 0
        for _, cam := range CtsId {
            if i < 2 {
                cameratodecode.Cameraid = cam.Cameraid
                cameratodecode.Rtsp = cam.RtspUrl
                Initchannel <- cameratodecode
            }
            i++
        }
    }
    for _, cam := range ctsid {
        Initchannel <- cam.Id
    }
}
func send(socket SocketContext, cam Camerdata) {
    b, err := json.Marshal(cam)
    if err != nil {
        fmt.Println("can not json convert !", cam)
    }
    if err := socket.Sock.Send(b); err != nil {
        fmt.Println("camera info: failed send")
    }
}
func CreateCamera(camera chan string) {
    for camid := range camera {
        if _, ok := SocketManage.Load(camid); !ok {
            url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid)
func SendRecv(socket SocketContext, camera chan Camerdata) {
    //socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    for cam := range camera {
        send(socket, cam)
        if _, ok := SocketManage[cam.Cameraid]; !ok {
            go func(id string, urlport int) {
                fmt.Println("create cid server: ", id)
                url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", urlport)
                cid, socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
            id, socketlisten, err := NewCamerSocketListen(deliver.PushPull, camid, url)
            if err != nil {
              fmt.Println("create socket error")
              continue
            }
                if err != nil {
                    return
                }
                fmt.Println("input id: ", id, " output id :", cid)
                Recv(cid, socketlisten)
            }(cam.Cameraid, UrlPort)
            UrlPort++
        }
    }
}
// 获取 cid , taskid, sdkid 关系
var CtsId []httpclient.Camerasingle
func CameraRelative() {
    CtsId = httpclient.GetEsDataReq("http://127.0.0.1:8000/data/api-v/camera/queryCameraAndTaskInfo")
}
//   根据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
    for _, camsingle := range CtsId {
        if cid == camsingle.Cameraid {
            for _, tasksingle := range camsingle.TaskList {
                tasks = append(tasks, tasksingle.Taskid)
            }
            return
        }
    }
    return
             go func(cid string, sock SocketContext ){
                  Recv(cid,sock)
             }(id,socketlisten)
        }
    }
}
// create server
func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) {
    fmt.Println("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
        ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
        socket.Context = ctx
        socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    fmt.Println("new socket.Sock: ", socket.Sock)
        socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
        fmt.Println("new socket.Sock: ", socket.Sock)
    if socket.Sock == nil {
        return cameraid, socket, errors.New("create listen error")
    }
    SocketManage[cameraid] = socket
    return cameraid, socket, nil
        if socket.Sock == nil {
            return cameraid, socket, errors.New("create listen error")
        }
        SocketManage.Store(cameraid, socket)
        return cameraid, socket, nil
}
func Recv(cameraid string, socket SocketContext) {
    //    socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    var msg []byte
    var err error
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("listen recv quit")
            return
        default:
            if msg, err = socket.Sock.Recv(); err != nil {
                //fmt.Printf("%s ", err)
                continue
            } else {
                fmt.Println("cameraid: ", len(msg))
                for _, taskid := range GetAlltask(cameraid) {
                    Taskdolist(cameraid, taskid, msg)
                    //    fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid)
                }
            }
        }
    }
        var msg []byte
        var err error
        for {
            select {
                case <-socket.Context.Done():
                    fmt.Println("listen recv quit")
                        return
                default:
                        if msg, err = socket.Sock.Recv(); err != nil {
                            fmt.Println("err is: ", cameraid, err)
                                continue
                        } else {
                            fmt.Println()
                            fmt.Println("============== one msg input ==========")
                            fmt.Println("cameraid: ",cameraid,  len(msg))
                            for _, taskid := range GetAlltask(cameraid) {
                                time.Sleep(5* time.Second)
                                fmt.Println("cameraid: ",cameraid," taskid: ", taskid)
                                Taskdolist(cameraid, taskid, msg)
                             }
                        }
            }
        }
}
//   根据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
    for _, camsingle := range cameraTasks {
        if cid == camsingle.Camera.Id {
            for _, tasksingle := range camsingle.Tasks {
                tasks = append(tasks, tasksingle.Taskid)
            }
                return
        }
    }
    return
}
func Taskdolist(cid string, taskid string, data []byte) {
        //  数据加工(打标签)
        sdkmsg := sdk.SdkData(cid, taskid, data)
        if sdkmsg.Tasklab == nil {
            fmt.Println("cid:%s 没有任务%s", cid, taskid)
                return
        }
    //  计算分发的主题
            SendTopic := sdk.SdkSendTopic(sdkmsg)
               if _, ok := sdk.SdkMap[SendTopic]; ok {
                   fmt.Println("分发的主题存在")
                       sdk.SdkMap[SendTopic] <- sdkmsg
                       //fmt.Println("重新开始循环: ", sdk.SdkMap)
               } else {
                   fmt.Println("分发的主题不存在")
               }
}
deliver
File was deleted
go.mod
@@ -3,12 +3,15 @@
go 1.12
require (
    github.com/Microsoft/go-winio v0.4.12 // indirect
    basic.com/dbapi.git v0.0.0-20190523103034-c0d33072098b
    basic.com/pubsub/protomsg.git v0.0.0-20190524072049-ce8c0f2b14dd
    basic.com/valib/deliver.git v0.0.0-20190522053509-c2bbe31e7c6c
    github.com/gogo/protobuf v1.2.1 // indirect
    github.com/golang/protobuf v1.3.1
    github.com/gorilla/websocket v1.4.0 // indirect
    github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
    github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
    github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
    golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
    golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5 // indirect
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -1,16 +1,31 @@
github.com/Microsoft/go-winio v0.4.12 h1:xAfWHN1IrQ0NJ9TBC0KBZoqLjzDTr1ML+4MywiUOryc=
github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
basic.com/dbapi.git v0.0.0-20190523025708-eaf1da6b55de h1:tkAqiVXaBz8upBGGu60jExv0H5H7m2OWZdR8aTAJkp0=
basic.com/dbapi.git v0.0.0-20190523025708-eaf1da6b55de/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
basic.com/dbapi.git v0.0.0-20190523103034-c0d33072098b h1:gzr51BWE821BzyhRb0iiP5Wu/yXTkbfcz0BkzfjacMs=
basic.com/dbapi.git v0.0.0-20190523103034-c0d33072098b/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
basic.com/pubsub/protomsg.git v0.0.0-20190523080134-c2459cf7ffa7 h1:AmdkBGk95CJy0jMU2SJkQNgldpPCMyAifeAPxUngQsw=
basic.com/pubsub/protomsg.git v0.0.0-20190523080134-c2459cf7ffa7/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190524044418-e6c6e5fdcdab h1:kTHZgvhdEJ+Vdbi1bBhKRA2oTYMhLZDqpWWk40yUd3s=
basic.com/pubsub/protomsg.git v0.0.0-20190524044418-e6c6e5fdcdab/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190524072049-ce8c0f2b14dd h1:Z1KVegr3JrNHaJFAv6yRniNIWdvzLWBPkpBRnpzgnYg=
basic.com/pubsub/protomsg.git v0.0.0-20190524072049-ce8c0f2b14dd/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/deliver.git v0.0.0-20190522053509-c2bbe31e7c6c h1:nyclQo40lBhvt2LnsaG/tNyxuotKou0V67jL1jBcJfM=
basic.com/valib/deliver.git v0.0.0-20190522053509-c2bbe31e7c6c/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5 h1:f005F/Jl5JLP036x7QIvUVhNTqxvSYwFIiyOh2q12iU=
golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
main.go
@@ -1,7 +1,6 @@
package main
import (
    "fmt"
    "log"
    "net/http"
    _ "net/http/pprof"
@@ -11,24 +10,19 @@
    //    "github.com/long/test/httpclient"
    "github.com/long/test/sdk"
    "github.com/long/test/tasktag"
    //"time"
    "time"
)
func main() {
    // pprof 用于分析性能
    go func() {
        log.Println(http.ListenAndServe("192.168.1.124:6060", nil))
        log.Println(http.ListenAndServe("192.168.1.123:6060", nil))
    }()
    sdk.Init()     //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签
    camera.Init()  //获取cid, taskid, sdkid ,关系
    fmt.Println()
    fmt.Println("===================================")
    fmt.Println()
    for {
        time.Sleep(time.Second)
    }
}
protomsg/.gitignore
File was deleted
protomsg/test.pb.go
File was deleted
protomsg/test.proto
File was deleted
sdk/sdk.go
@@ -5,17 +5,25 @@
    "errors"
    "fmt"
    "github.com/long/test/httpclient"
    "github.com/long/test/protomsg"
//    "github.com/long/test/httpclient"
    "github.com/long/test/tasktag"
    "github.com/long/test/util"
    "github.com/golang/protobuf/proto"
    "github.com/long/test/deliver"
    "basic.com/valib/deliver.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/dbapi.git"
)
//var doOnce sync.Once
const (
    postPull="_1.ipc"
    postPush="_2.ipc"
)
var SocketManage = make(map[string]SocketContext)
var sdkapi dbapi.SdkApi
type SocketContext struct {
    Sock    deliver.Deliver
@@ -25,32 +33,27 @@
func Init() {
    fmt.Println("============= init sdk info =====================")
    sdklist := SdkAll() //获取所有sdk
    fmt.Println("sdk list have: ", sdklist)
    SdkCreateTopic(sdklist) // 创建主题
    for _, sdkid := range sdklist { // 创建sdk server
        url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort)
        url := fmt.Sprintf("ipc:///tmp/%s%s",sdkid,postPull)
        sdkidser, socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
        if err != nil {
            continue
        }
        UrlPort++
        go Send(sdkidser, socketser, SdkMap[sdkid])
        url = fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPortR)
        url = fmt.Sprintf("ipc:///tmp/%s%s",sdkid,postPush)
        _, socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
        if err != nil {
            continue
        }
        UrlPortR++
        go Recv(socketdial)
    }
    go es(SdkMap["es"])
}
//单独处理   es 主题的情况
@@ -58,7 +61,6 @@
    for _ = range sdkmsgchan {
        fmt.Println("this data is finish all sdk! ")
    }
}
//动态处理
@@ -92,7 +94,6 @@
        sdkmsg.Tasklab = nil
        return sdkmsg
    }
    sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
    sdkmsg.Data = data
    return sdkmsg
@@ -105,15 +106,14 @@
    } else {
        sdksend = "es"
    }
    fmt.Println()
    fmt.Println("分发的主题是: ", sdksend)
    fmt.Println()
    return
}
// 调用  http 借口获取摄像机信息
func SdkAll() (sdklist []string) {
    sdklist = httpclient.GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid")
    //sdklist = httpclient.GetSdk("http://192.168.1.124:8000/data/api-v/sdk/findskdid")
    sdklist = sdkapi.GetAllSdkIds()
    return
}
@@ -130,8 +130,6 @@
    return nil
}
var UrlPort = 9000
var UrlPortR = 9500
// create server
func NewSdkSocketListen(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) {
@@ -216,6 +214,7 @@
            fmt.Println("send len of data: ", len(data))
            if err := socket.Sock.Send(data); err != nil {
                fmt.Println(socket.Sock)
                fmt.Println("failed send")
                continue
            }
tasktag/tasktag.go
@@ -2,26 +2,33 @@
import (
    "fmt"
    "github.com/long/test/protomsg"
    "basic.com/pubsub/protomsg.git"
    "basic.com/dbapi.git"
)
var TaskMapLab = make(map[string]*protomsg.TaskLabel)
var TaskMapLab = make(map[string] *protomsg.TaskLabel)
var taskapi dbapi.TaskApi
//
func Init() {
    fmt.Println("=========== tasktag info ====================")
    var tls []protomsg.TaskLabel
    sdk1 := "812b674b-2375-4589-919a-5c1c3278a972"
    sdk2 := "812b674b-2375-4589-919a-5c1c3278a971"
    task1 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d1", Sdkids: []string{sdk1, sdk2}, Index: int32(0)}
    tls = append(tls, task1)
    task2 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d2", Sdkids: []string{sdk1}, Index: int32(0)}
    tls = append(tls, task2)
     taskSdks := taskapi.FindAll()
     for _, taskSdk := range taskSdks {
         fmt.Println("test:    ", taskSdk)
         var tl  protomsg.TaskLabel
         tl.Taskid = taskSdk.Task.Taskid
         for _, sdkinfo := range taskSdk.Sdks {
             tl.Sdkids = append(tl.Sdkids, sdkinfo.Id)
         }
         fmt.Println(tl)
         tl.Index=int32(0)
         tls = append(tls, tl)
     }
    GenTasklab(tls)
    for key, value := range TaskMapLab {
        fmt.Println()
        fmt.Println(key, value)
@@ -32,6 +39,7 @@
//以 taskid 作为key, 对应的算法组合作为 value
func GenTasklab(tasklab []protomsg.TaskLabel) {
    for _, value := range tasklab {
        TaskMapLab[value.Taskid] = &value
        pv := value
        TaskMapLab[value.Taskid] = &pv
    }
}
test
Binary files differ