package camera
|
|
import (
|
"errors"
|
|
"basic.com/dbapi.git"
|
"basic.com/valib/deliver.git"
|
|
"github.com/long/test/sdk"
|
"github.com/long/test/util"
|
|
"context"
|
"fmt"
|
"sync"
|
//"time"
|
)
|
|
//var SocketManage = make(map[string]SocketContext)
|
var SocketManage sync.Map
|
|
var Initchannel = make(chan string)
|
|
type SocketContext struct {
|
Sock deliver.Deliver
|
Context context.Context
|
Cancel context.CancelFunc
|
}
|
|
var camval dbapi.CameraApi
|
|
func Init() {
|
|
fmt.Println("============ camera info ====================")
|
for _, cd := range util.CameraIds {
|
fmt.Println(cd)
|
fmt.Println()
|
}
|
|
go CreateCamera(Initchannel)
|
go AutoDelCamera(util.Cameraflag)
|
|
for _, cam := range util.CameraIds {
|
Initchannel <- cam.Id
|
}
|
}
|
|
func CreateCamera(camera chan string) {
|
for camid := range camera {
|
if _, ok := SocketManage.Load(camid); !ok {
|
url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid)
|
|
id, socketlisten, err := NewCamerSocketListen(deliver.PushPull, camid, url)
|
if err != nil {
|
fmt.Println("create socket error")
|
continue
|
}
|
|
go func(cid string, sock SocketContext) {
|
Recv(cid, sock)
|
}(id, socketlisten)
|
}
|
}
|
}
|
|
//动态处理
|
func AutoDelCamera(cameraflag chan bool) {
|
|
for _ = range cameraflag {
|
fmt.Println("test autodelcameraflag")
|
var oldcamera []string
|
|
SocketManage.Range(func(k, v interface{}) bool {
|
if str, ok := k.(string); ok {
|
oldcamera = append(oldcamera, str)
|
}
|
return true
|
})
|
|
var newcamera []string
|
|
for _, value := range util.CameraIds {
|
newcamera = append(newcamera, value.Id)
|
}
|
|
cameraChanDel := util.Difference(oldcamera, newcamera)
|
fmt.Println(cameraChanDel)
|
|
for key, op := range cameraChanDel {
|
if op == "add" {
|
Initchannel <- key
|
} else {
|
if sock, ok := SocketManage.Load(key); ok {
|
if socket, sok := sock.(SocketContext); sok {
|
socket.Cancel()
|
SocketManage.Delete(key)
|
}
|
}
|
fmt.Println("删除camera server : ", key)
|
}
|
}
|
}
|
}
|
|
// create server
|
func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) {
|
ctx, cancel := context.WithCancel(context.Background())
|
|
socket.Context = ctx
|
socket.Cancel = cancel
|
|
socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
|
fmt.Println("new socket.Sock: ", socket.Sock)
|
|
if socket.Sock == nil {
|
return cameraid, socket, errors.New("create listen error")
|
}
|
SocketManage.Store(cameraid, socket)
|
return cameraid, socket, nil
|
}
|
|
func Recv(cameraid string, socket SocketContext) {
|
var msg []byte
|
var err error
|
for {
|
select {
|
case <-socket.Context.Done():
|
fmt.Println("listen recv quit")
|
return
|
default:
|
if msg, err = socket.Sock.Recv(); err != nil {
|
fmt.Println("err is: ", cameraid, err)
|
continue
|
} else {
|
fmt.Println()
|
fmt.Println("============== one msg input ==========")
|
for _, taskid := range GetAlltask(cameraid) {
|
//time.Sleep(5 * time.Second)
|
fmt.Println("cameraid: ", cameraid, " taskid: ", taskid)
|
Taskdolist(cameraid, taskid, msg)
|
}
|
}
|
}
|
}
|
}
|
|
// 根据cid 获取 所有的任务
|
func GetAlltask(cid string) (tasks []string) {
|
for _, camsingle := range util.CameraTasks {
|
if cid == camsingle.Camera.Id {
|
for _, tasksingle := range camsingle.Tasks {
|
tasks = append(tasks, tasksingle.Taskid)
|
}
|
return
|
}
|
}
|
return
|
}
|
|
func Taskdolist(cid string, taskid string, data []byte) {
|
|
// 数据加工(打标签)
|
sdkmsg := sdk.SdkData(cid, taskid, data)
|
if sdkmsg.Tasklab == nil {
|
fmt.Printf("cid:%s 没有任务%s\n", cid, taskid)
|
return
|
}
|
|
// 计算分发的主题
|
SendTopic := sdk.SdkSendTopic(sdkmsg)
|
if _, ok := sdk.SdkMap[SendTopic]; ok {
|
fmt.Println("分发的主题存在")
|
sdk.SdkMap[SendTopic] <- sdkmsg
|
//fmt.Println("重新开始循环: ", sdk.SdkMap)
|
} else {
|
fmt.Println("分发的主题不存在")
|
}
|
}
|