| camera/camera.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| deliver | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| go.mod | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| go.sum | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| main.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| protomsg/.gitignore | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| protomsg/test.pb.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| protomsg/test.proto | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| sdk/sdk.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| tasktag/tasktag.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| test | 补丁 | 查看 | 原始文档 | blame | 历史 |
camera/camera.go
@@ -1,183 +1,141 @@ package camera import ( "errors" "errors" "github.com/long/test/deliver" "github.com/long/test/httpclient" "github.com/long/test/sdk" "basic.com/dbapi.git" "basic.com/valib/deliver.git" "basic.com/pubsub/protomsg.git" "context" "encoding/json" "fmt" "time" ) "github.com/long/test/sdk" /* * 1. 获取 cid * 2. 获取 cid和 taskid关系 * 3. 获取 cid ipc communication */ var SocketManage = make(map[string]SocketContext) var Initchannel = make(chan Camerdata) var UrlPort = 7000 "context" "fmt" "sync" "time" ) type Camerdata struct { Cameraid string Rtsp string } //var SocketManage = make(map[string]SocketContext) var SocketManage sync.Map type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } var Initchannel = make(chan string) func Taskdolist(cid string, taskid string, data []byte) { type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } fmt.Println("======================================") // 数据加工(打标签) sdkmsg := sdk.SdkData(cid, taskid, data) if sdkmsg.Tasklab == nil { fmt.Println("cid:%s 没有任务%s", cid, taskid) return } var camval dbapi.CameraApi // 计算分发的主题 SendTopic := sdk.SdkSendTopic(sdkmsg) if _, ok := sdk.SdkMap[SendTopic]; ok { fmt.Println("分发的主题存在") sdk.SdkMap[SendTopic] <- sdkmsg //fmt.Println("重新开始循环: ", sdk.SdkMap) } else { fmt.Println("分发的主题不存在") } } //get camera with task var cameraTasks []protomsg.CameraAndTaskInfo func Init() { CameraRelative() fmt.Println() fmt.Println("cid,taskid , sid: ", CtsId) fmt.Println("============ camera info ====================") ctsid := camval.FindAll() url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort) _, socket, err := NewCamerSocketListen(deliver.Pair, "init", url) if err != nil { return } UrlPort++ fmt.Println("==============camera camera with task ================") cameraTasks=camval.FindAllCameraAndTask() go SendRecv(socket, Initchannel) go CreateCamera(Initchannel) var cameratodecode Camerdata for { time.Sleep(2 * time.Second) i := 0 for _, cam := range CtsId { if i < 2 { cameratodecode.Cameraid = cam.Cameraid cameratodecode.Rtsp = cam.RtspUrl Initchannel <- cameratodecode } i++ } } for _, cam := range ctsid { Initchannel <- cam.Id } } func send(socket SocketContext, cam Camerdata) { b, err := json.Marshal(cam) if err != nil { fmt.Println("can not json convert !", cam) } if err := socket.Sock.Send(b); err != nil { fmt.Println("camera info: failed send") } } func CreateCamera(camera chan string) { for camid := range camera { if _, ok := SocketManage.Load(camid); !ok { url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid) func SendRecv(socket SocketContext, camera chan Camerdata) { //socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second) for cam := range camera { send(socket, cam) if _, ok := SocketManage[cam.Cameraid]; !ok { go func(id string, urlport int) { fmt.Println("create cid server: ", id) url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", urlport) cid, socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url) id, socketlisten, err := NewCamerSocketListen(deliver.PushPull, camid, url) if err != nil { fmt.Println("create socket error") continue } if err != nil { return } fmt.Println("input id: ", id, " output id :", cid) Recv(cid, socketlisten) }(cam.Cameraid, UrlPort) UrlPort++ } } } // 获取 cid , taskid, sdkid 关系 var CtsId []httpclient.Camerasingle func CameraRelative() { CtsId = httpclient.GetEsDataReq("http://127.0.0.1:8000/data/api-v/camera/queryCameraAndTaskInfo") } // 根据cid 获取 所有的任务 func GetAlltask(cid string) (tasks []string) { for _, camsingle := range CtsId { if cid == camsingle.Cameraid { for _, tasksingle := range camsingle.TaskList { tasks = append(tasks, tasksingle.Taskid) } return } } return go func(cid string, sock SocketContext ){ Recv(cid,sock) }(id,socketlisten) } } } // create server func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) fmt.Println("new socket.Sock: ", socket.Sock) socket.Sock = deliver.NewServer(deliver.Mode(mode), url) fmt.Println("new socket.Sock: ", socket.Sock) if socket.Sock == nil { return cameraid, socket, errors.New("create listen error") } SocketManage[cameraid] = socket return cameraid, socket, nil if socket.Sock == nil { return cameraid, socket, errors.New("create listen error") } SocketManage.Store(cameraid, socket) return cameraid, socket, nil } func Recv(cameraid string, socket SocketContext) { // socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second) var msg []byte var err error for { select { case <-socket.Context.Done(): fmt.Println("listen recv quit") return default: if msg, err = socket.Sock.Recv(); err != nil { //fmt.Printf("%s ", err) continue } else { fmt.Println("cameraid: ", len(msg)) for _, taskid := range GetAlltask(cameraid) { Taskdolist(cameraid, taskid, msg) // fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid) } } } } var msg []byte var err error for { select { case <-socket.Context.Done(): fmt.Println("listen recv quit") return default: if msg, err = socket.Sock.Recv(); err != nil { fmt.Println("err is: ", cameraid, err) continue } else { fmt.Println() fmt.Println("============== one msg input ==========") fmt.Println("cameraid: ",cameraid, len(msg)) for _, taskid := range GetAlltask(cameraid) { time.Sleep(5* time.Second) fmt.Println("cameraid: ",cameraid," taskid: ", taskid) Taskdolist(cameraid, taskid, msg) } } } } } // 根据cid 获取 所有的任务 func GetAlltask(cid string) (tasks []string) { for _, camsingle := range cameraTasks { if cid == camsingle.Camera.Id { for _, tasksingle := range camsingle.Tasks { tasks = append(tasks, tasksingle.Taskid) } return } } return } func Taskdolist(cid string, taskid string, data []byte) { // 数据加工(打标签) sdkmsg := sdk.SdkData(cid, taskid, data) if sdkmsg.Tasklab == nil { fmt.Println("cid:%s 没有任务%s", cid, taskid) return } // 计算分发的主题 SendTopic := sdk.SdkSendTopic(sdkmsg) if _, ok := sdk.SdkMap[SendTopic]; ok { fmt.Println("分发的主题存在") sdk.SdkMap[SendTopic] <- sdkmsg //fmt.Println("重新开始循环: ", sdk.SdkMap) } else { fmt.Println("分发的主题不存在") } } deliver
File was deleted go.mod
@@ -3,12 +3,15 @@ go 1.12 require ( github.com/Microsoft/go-winio v0.4.12 // indirect basic.com/dbapi.git v0.0.0-20190523103034-c0d33072098b basic.com/pubsub/protomsg.git v0.0.0-20190524072049-ce8c0f2b14dd basic.com/valib/deliver.git v0.0.0-20190522053509-c2bbe31e7c6c github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/protobuf v1.3.1 github.com/gorilla/websocket v1.4.0 // indirect github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5 // indirect nanomsg.org/go-mangos v1.4.0 ) go.sum
@@ -1,16 +1,31 @@ github.com/Microsoft/go-winio v0.4.12 h1:xAfWHN1IrQ0NJ9TBC0KBZoqLjzDTr1ML+4MywiUOryc= github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= basic.com/dbapi.git v0.0.0-20190523025708-eaf1da6b55de h1:tkAqiVXaBz8upBGGu60jExv0H5H7m2OWZdR8aTAJkp0= basic.com/dbapi.git v0.0.0-20190523025708-eaf1da6b55de/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q= basic.com/dbapi.git v0.0.0-20190523103034-c0d33072098b h1:gzr51BWE821BzyhRb0iiP5Wu/yXTkbfcz0BkzfjacMs= basic.com/dbapi.git v0.0.0-20190523103034-c0d33072098b/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q= basic.com/pubsub/protomsg.git v0.0.0-20190523080134-c2459cf7ffa7 h1:AmdkBGk95CJy0jMU2SJkQNgldpPCMyAifeAPxUngQsw= basic.com/pubsub/protomsg.git v0.0.0-20190523080134-c2459cf7ffa7/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= basic.com/pubsub/protomsg.git v0.0.0-20190524044418-e6c6e5fdcdab h1:kTHZgvhdEJ+Vdbi1bBhKRA2oTYMhLZDqpWWk40yUd3s= basic.com/pubsub/protomsg.git v0.0.0-20190524044418-e6c6e5fdcdab/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= basic.com/pubsub/protomsg.git v0.0.0-20190524072049-ce8c0f2b14dd h1:Z1KVegr3JrNHaJFAv6yRniNIWdvzLWBPkpBRnpzgnYg= basic.com/pubsub/protomsg.git v0.0.0-20190524072049-ce8c0f2b14dd/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= basic.com/valib/deliver.git v0.0.0-20190522053509-c2bbe31e7c6c h1:nyclQo40lBhvt2LnsaG/tNyxuotKou0V67jL1jBcJfM= basic.com/valib/deliver.git v0.0.0-20190522053509-c2bbe31e7c6c/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U= github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8= github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI= github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs= github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0= github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk= golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc= golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5 h1:f005F/Jl5JLP036x7QIvUVhNTqxvSYwFIiyOh2q12iU= golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM= nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ= main.go
@@ -1,7 +1,6 @@ package main import ( "fmt" "log" "net/http" _ "net/http/pprof" @@ -11,24 +10,19 @@ // "github.com/long/test/httpclient" "github.com/long/test/sdk" "github.com/long/test/tasktag" //"time" "time" ) func main() { // pprof 用于分析性能 go func() { log.Println(http.ListenAndServe("192.168.1.124:6060", nil)) log.Println(http.ListenAndServe("192.168.1.123:6060", nil)) }() sdk.Init() // 获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行) tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签 camera.Init() //获取cid, taskid, sdkid ,关系 fmt.Println() fmt.Println("===================================") fmt.Println() for { time.Sleep(time.Second) } } protomsg/.gitignore
File was deleted protomsg/test.pb.go
File was deleted protomsg/test.proto
File was deleted sdk/sdk.go
@@ -5,17 +5,25 @@ "errors" "fmt" "github.com/long/test/httpclient" "github.com/long/test/protomsg" // "github.com/long/test/httpclient" "github.com/long/test/tasktag" "github.com/long/test/util" "github.com/golang/protobuf/proto" "github.com/long/test/deliver" "basic.com/valib/deliver.git" "basic.com/pubsub/protomsg.git" "basic.com/dbapi.git" ) //var doOnce sync.Once const ( postPull="_1.ipc" postPush="_2.ipc" ) var SocketManage = make(map[string]SocketContext) var sdkapi dbapi.SdkApi type SocketContext struct { Sock deliver.Deliver @@ -25,32 +33,27 @@ func Init() { fmt.Println("============= init sdk info =====================") sdklist := SdkAll() //获取所有sdk fmt.Println("sdk list have: ", sdklist) SdkCreateTopic(sdklist) // 创建主题 for _, sdkid := range sdklist { // 创建sdk server url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort) url := fmt.Sprintf("ipc:///tmp/%s%s",sdkid,postPull) sdkidser, socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { continue } UrlPort++ go Send(sdkidser, socketser, SdkMap[sdkid]) url = fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPortR) url = fmt.Sprintf("ipc:///tmp/%s%s",sdkid,postPush) _, socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { continue } UrlPortR++ go Recv(socketdial) } go es(SdkMap["es"]) } //单独处理 es 主题的情况 @@ -58,7 +61,6 @@ for _ = range sdkmsgchan { fmt.Println("this data is finish all sdk! ") } } //动态处理 @@ -92,7 +94,6 @@ sdkmsg.Tasklab = nil return sdkmsg } sdkmsg.Tasklab = tasktag.TaskMapLab[taskid] sdkmsg.Data = data return sdkmsg @@ -105,15 +106,14 @@ } else { sdksend = "es" } fmt.Println() fmt.Println("分发的主题是: ", sdksend) fmt.Println() return } // 调用 http 借口获取摄像机信息 func SdkAll() (sdklist []string) { sdklist = httpclient.GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid") //sdklist = httpclient.GetSdk("http://192.168.1.124:8000/data/api-v/sdk/findskdid") sdklist = sdkapi.GetAllSdkIds() return } @@ -130,8 +130,6 @@ return nil } var UrlPort = 9000 var UrlPortR = 9500 // create server func NewSdkSocketListen(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { @@ -216,6 +214,7 @@ fmt.Println("send len of data: ", len(data)) if err := socket.Sock.Send(data); err != nil { fmt.Println(socket.Sock) fmt.Println("failed send") continue } tasktag/tasktag.go
@@ -2,26 +2,33 @@ import ( "fmt" "github.com/long/test/protomsg" "basic.com/pubsub/protomsg.git" "basic.com/dbapi.git" ) var TaskMapLab = make(map[string]*protomsg.TaskLabel) var TaskMapLab = make(map[string] *protomsg.TaskLabel) var taskapi dbapi.TaskApi // func Init() { fmt.Println("=========== tasktag info ====================") var tls []protomsg.TaskLabel sdk1 := "812b674b-2375-4589-919a-5c1c3278a972" sdk2 := "812b674b-2375-4589-919a-5c1c3278a971" task1 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d1", Sdkids: []string{sdk1, sdk2}, Index: int32(0)} tls = append(tls, task1) task2 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d2", Sdkids: []string{sdk1}, Index: int32(0)} tls = append(tls, task2) taskSdks := taskapi.FindAll() for _, taskSdk := range taskSdks { fmt.Println("test: ", taskSdk) var tl protomsg.TaskLabel tl.Taskid = taskSdk.Task.Taskid for _, sdkinfo := range taskSdk.Sdks { tl.Sdkids = append(tl.Sdkids, sdkinfo.Id) } fmt.Println(tl) tl.Index=int32(0) tls = append(tls, tl) } GenTasklab(tls) for key, value := range TaskMapLab { fmt.Println() fmt.Println(key, value) @@ -32,6 +39,7 @@ //以 taskid 作为key, 对应的算法组合作为 value func GenTasklab(tasklab []protomsg.TaskLabel) { for _, value := range tasklab { TaskMapLab[value.Taskid] = &value pv := value TaskMapLab[value.Taskid] = &pv } } testBinary files differ