package camera
|
|
import (
|
"errors"
|
|
"basic.com/valib/deliver.git"
|
"github.com/long/taskpubsub/logger"
|
|
"github.com/long/taskpubsub/sdk"
|
"github.com/long/taskpubsub/util"
|
"github.com/gogo/protobuf/proto"
|
"basic.com/pubsub/protomsg.git"
|
|
"context"
|
"fmt"
|
"sync"
|
//"time"
|
|
)
|
|
var SocketManage sync.Map
|
|
type SocketContext struct {
|
Sock deliver.Deliver
|
Context context.Context
|
Cancel context.CancelFunc
|
}
|
|
func Init() {
|
logger.Info("============ camera info ====================")
|
for _, cd := range util.CameraIds {
|
logger.Info(cd)
|
logger.Info()
|
}
|
|
// 摄像机初始化
|
for _, cam := range util.CameraIds {
|
CreateCamera(cam.Id, "camera")
|
}
|
|
// web端初始化
|
CreateCamera("virtual-faceextract-sdk-pull_2" , "web")
|
|
go AutoDelCamera(util.Cameraflag)
|
}
|
|
// camera 接受数据
|
func CreateCamera(id string, remote string) {
|
if _, ok := SocketManage.Load(id); !ok {
|
url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
|
|
socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
|
if err != nil {
|
logger.Error("create socket error")
|
return
|
}
|
|
go Recv(socketlisten, remote)
|
}
|
}
|
|
//动态处理
|
func AutoDelCamera(cameraflag chan bool) {
|
|
for _ = range cameraflag {
|
logger.Info("test autodelcameraflag")
|
var oldcamera []string
|
|
SocketManage.Range(func(k, v interface{}) bool {
|
if str, ok := k.(string); ok {
|
oldcamera = append(oldcamera, str)
|
}
|
return true
|
})
|
|
var newcamera []string
|
|
for _, camnew := range util.CameraIds {
|
newcamera = append(newcamera, camnew.Id)
|
}
|
newcamera = append(newcamera, "virtual-faceextract-sdk-pull_2")
|
|
|
cameraChanDel := util.Difference(oldcamera, newcamera)
|
logger.Info(cameraChanDel)
|
|
for key, op := range cameraChanDel {
|
if op == "add" {
|
CreateCamera(key, "camera")
|
logger.Info("add new camera id=========================", key)
|
} else {
|
if sock, ok := SocketManage.Load(key); ok {
|
if socket, sok := sock.(SocketContext); sok {
|
socket.Cancel()
|
SocketManage.Delete(key)
|
}
|
}
|
logger.Info("删除camera server : ", key)
|
}
|
}
|
}
|
}
|
|
// create server
|
func NewCamerSocketListen(mode int, cameraid string, url string) (socket SocketContext, err error) {
|
ctx, cancel := context.WithCancel(context.Background())
|
|
socket.Context = ctx
|
socket.Cancel = cancel
|
|
socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
|
logger.Info("new socket.Sock: ", socket.Sock)
|
|
if socket.Sock == nil {
|
return socket, errors.New("create listen error")
|
}
|
SocketManage.Store(cameraid, socket)
|
return socket, nil
|
}
|
|
func Recv(socket SocketContext, remote string ) {
|
|
var recvmessage []byte
|
var imagemsg protomsg.Image
|
var err error
|
for {
|
select {
|
case <-socket.Context.Done():
|
logger.Error("listen recv quit")
|
return
|
default:
|
if recvmessage, err = socket.Sock.Recv(); err != nil {
|
//logger.Error("[camera] err is: ", err)
|
continue
|
}
|
|
unmsg, err := util.UnCompress(recvmessage)
|
if err != nil {
|
logger.Error(err)
|
continue
|
}
|
|
if err := proto.Unmarshal(unmsg,&imagemsg); err != nil {
|
logger.Error("recv msg is not protomsgImage")
|
continue
|
}
|
|
switch remote {
|
case "camera":
|
for _, taskid := range GetAlltask(imagemsg.Cid) {
|
//time.Sleep(5 * time.Second)
|
logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskid)
|
Taskdolist(imagemsg.Cid, "", taskid, recvmessage)
|
}
|
case "web":
|
logger.Debug("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
|
Taskdolist(imagemsg.Cid,"", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2", recvmessage)
|
}
|
}
|
}
|
}
|
|
// 据cid 获取 所有的任务
|
func GetAlltask(cid string) (tasks []string) {
|
for _, camsingle := range util.CameraTasks {
|
if cid == camsingle.Camera.Id {
|
for _, tasksingle := range camsingle.Tasks {
|
tasks = append(tasks, tasksingle.Taskid)
|
}
|
return
|
}
|
}
|
return
|
}
|
|
func Taskdolist(cid string, caddr string, taskid string, data []byte) {
|
|
// 数据加工(打标签)
|
logger.Debug("taskid: ",taskid, "has ", len(data), "data[]byte")
|
sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
|
if sdkmsg.Tasklab == nil {
|
logger.Error(cid, " not have taskid: ", taskid )
|
return
|
}
|
|
// 计算分发的主题
|
SendTopic := sdk.SdkSendTopic(sdkmsg)
|
logger.Debug(SendTopic)
|
if _, ok := sdk.SdkMap[SendTopic]; ok {
|
sdk.SdkMap[SendTopic] <- sdkmsg
|
logger.Debug("dispute sendtopic success", SendTopic)
|
} else {
|
logger.Debug("分发的主题不存在")
|
}
|
}
|