package camera
|
|
import (
|
"basic.com/valib/deliver.git"
|
//"sync"
|
"taskpubsub/logger"
|
|
"taskpubsub/sdk"
|
"taskpubsub/util"
|
|
"basic.com/pubsub/protomsg.git"
|
"github.com/gogo/protobuf/proto"
|
|
// "golang.org/x/sys/unix"
|
|
"fmt"
|
//"time"
|
)
|
|
const (
|
faceExtractWebCID = "virtual-face-extract-web-camera-id"
|
faceExtractWebTaskID = "92496BDF-2BFA-98F2-62E8-96DD9866ABD2"
|
)
|
|
var SocketManage = make(map[string]util.SocketContext)
|
|
var shm bool = false
|
|
var innerRecvTopic = []string{
|
"virtual-faceextract-sdk-pull_2", //to web 以图搜图
|
}
|
|
func initInnerTopic() {
|
for _, recvTopic := range innerRecvTopic {
|
createCameraRecvServerAndListen(recvTopic)
|
}
|
}
|
|
func Init(useShm bool) {
|
shm = useShm
|
|
logger.Info("============ camera info ====================")
|
for _, cd := range util.CameraIds {
|
logger.Info(cd)
|
logger.Info()
|
}
|
|
// 摄像机初始化
|
for _, cam := range util.CameraIds {
|
createCameraRecvServerAndListen(cam.Id)
|
}
|
|
// 手动输入的主题
|
initInnerTopic()
|
|
go autoUpdateCamera(util.Cameraflag)
|
}
|
|
// camera 接受数据
|
func createCameraRecvServerAndListen(id string) {
|
if _, isExist := SocketManage[id]; !isExist { //不存在
|
|
url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
|
m := deliver.PushPull
|
if shm {
|
m = deliver.Shm
|
url = id
|
|
}
|
|
fmt.Println("CAMERA URL : ", url)
|
socket, err := util.NewSocketListen(int(m), url, shm)
|
if err != nil {
|
logger.Error("create socket error")
|
return
|
}
|
SocketManage[id] = socket
|
|
go Recv(socket)
|
}
|
}
|
|
func deleteCameraRecvServer(id string) {
|
if _, isExist := SocketManage[id]; isExist { //存在
|
SocketManage[id].Cancel()
|
delete(SocketManage, id)
|
logger.Info("删除server sdk: ", id)
|
}
|
}
|
|
//动态处理
|
func autoUpdateCamera(cameraflag chan bool) {
|
|
for _ = range cameraflag {
|
logger.Info("test autodelcameraflag")
|
var oldcameras []string
|
|
for key := range SocketManage {
|
oldcameras = append(oldcameras, key)
|
}
|
|
var newcameras []string
|
for _, camnew := range util.CameraIds {
|
newcameras = append(newcameras, camnew.Id)
|
}
|
|
// 手动添加的全部加上
|
for _, recvTopic := range innerRecvTopic {
|
newcameras = append(newcameras, recvTopic)
|
}
|
|
cameraListUpdate := util.Difference(oldcameras, newcameras)
|
logger.Info(cameraListUpdate)
|
|
for key, op := range cameraListUpdate {
|
if op == "add" {
|
createCameraRecvServerAndListen(key)
|
} else {
|
deleteCameraRecvServer(key)
|
}
|
}
|
}
|
}
|
|
func Recv(socket util.SocketContext) {
|
|
tryCount := 0
|
|
var recvmessage []byte
|
var imagemsg protomsg.Image
|
var err error
|
for {
|
select {
|
case <-socket.Context.Done():
|
logger.Error("listen recv quit")
|
return
|
default:
|
if recvmessage, err = socket.Sock.Recv(); err != nil {
|
//logger.Error("[camera] err is: ", err)
|
// fmt.Println("CAMERA RECV ERROR: ", err)
|
|
if socket.UseSHM {
|
if tryCount > util.SHMMaxTryCount {
|
socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
|
fmt.Println("CAMERA SHM TRY :", tryCount, " RESTART IT")
|
|
tryCount = 0
|
|
continue
|
}
|
tryCount++
|
}
|
continue
|
}
|
|
unmsg, err := util.UnCompress(recvmessage)
|
if err != nil {
|
logger.Error(err)
|
continue
|
}
|
|
if err := proto.Unmarshal(unmsg, &imagemsg); err != nil {
|
logger.Error("recv msg is not protomsgImage")
|
continue
|
}
|
if faceExtractWebCID == imagemsg.Cid { //以图搜图
|
doTaskList(imagemsg.Cid, "", faceExtractWebTaskID, recvmessage)
|
} else {
|
taskIDs := GetAllTaskByID(imagemsg.Cid)
|
for _, taskID := range taskIDs {
|
logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID)
|
doTaskList(imagemsg.Cid, "", taskID, recvmessage)
|
}
|
}
|
}
|
}
|
}
|
|
// 据cid 获取 所有的任务
|
func GetAllTaskByID(cid string) (tasks []string) {
|
for _, camsingle := range util.CameraTasks {
|
if cid == camsingle.Camera.Id {
|
for _, tasksingle := range camsingle.Tasks {
|
if !tasksingle.Enable {
|
continue
|
}
|
tasks = append(tasks, tasksingle.Taskid)
|
}
|
return
|
}
|
}
|
return
|
}
|
|
func doTaskList(cid string, caddr string, taskid string, data []byte) {
|
|
// 数据加工(打标签)
|
logger.Debug("taskid: ", taskid, "has ", len(data), "data[]byte")
|
sdkmsg := sdk.ToSdkMsg(cid, caddr, taskid, data)
|
if sdkmsg.Tasklab == nil {
|
logger.Error(cid, " not have taskid: ", taskid)
|
return
|
}
|
|
// 计算分发的主题
|
SendTopic := sdk.GetSdkSendTopic(sdkmsg)
|
logger.Debug(SendTopic)
|
if _, ok := sdk.SdkMap[SendTopic]; ok {
|
sdk.SdkMap[SendTopic] <- sdkmsg
|
logger.Debug("dispute sendtopic success", SendTopic)
|
} else {
|
logger.Debug("分发的主题不存在")
|
}
|
}
|