package camera import ( "strings" "taskpubsub/sdk" "taskpubsub/util" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" "basic.com/valib/logger.git" "github.com/gogo/protobuf/proto" ) const ( faceExtractWebCID = "virtual-face-extract-web-camera-id" faceExtractWebTaskID = "92496BDF-2BFA-98F2-62E8-96DD9866ABD2" ) var SocketManage = make(map[string]util.SocketContext) var shm bool = false var innerRecvTopic = []string{ "virtual-faceextract-sdk-pull_2", //to web 以图搜图 } func initInnerTopic() { // for _, recvTopic := range innerRecvTopic { // createCameraRecvServerAndListen(recvTopic) // } } func Init(useShm bool) { shm = useShm logger.Info("============ camera info ====================") for _, cd := range util.CameraIds { logger.Info(cd) logger.Info() } // 摄像机初始化 for _, cam := range util.CameraIds { createCameraRecvServerAndListen(cam.Id) } //文件初始化 for _, f := range util.FileArr { logger.Info(f) } for _, f := range util.FileArr { createCameraRecvServerAndListen(f.Id) } // 手动输入的主题 initInnerTopic() go autoUpdateCamera(util.Cameraflag) go autoUpdateFiles(util.Fileflag) } // camera 接受数据 func createCameraRecvServerAndListen(id string) { if _, isExist := SocketManage[id]; !isExist { //不存在 url := "ipc:///tmp/" + id + ".ipc" m := deliver.PushPull if shm { m = deliver.Shm url = id } logger.Info("CAMERA URL : ", url) socket, err := util.NewSocketListen(int(m), url, shm) if err != nil { logger.Error("create socket error") return } SocketManage[id] = socket go Recv(socket) } } func deleteCameraRecvServer(id string) { if _, isExist := SocketManage[id]; isExist { //存在 SocketManage[id].Cancel() delete(SocketManage, id) logger.Info("删除server sdk: ", id) } } //动态处理 func autoUpdateCamera(cameraflag chan bool) { for _ = range cameraflag { logger.Info("test autodelcameraflag") var oldcameras []string for key := range SocketManage { if !strings.HasPrefix(key, util.File_Video_Id_Pre) && !strings.HasPrefix(key, util.File_Audio_Id_Pre) && !strings.HasPrefix(key, util.File_Img_Id_Pre) { oldcameras = append(oldcameras, key) } } var newcameras []string for _, camnew := range util.CameraIds { newcameras = append(newcameras, camnew.Id) } // 手动添加的全部加上 for _, recvTopic := range innerRecvTopic { newcameras = append(newcameras, recvTopic) } cameraListUpdate := util.Difference(oldcameras, newcameras) logger.Info(cameraListUpdate) for key, op := range cameraListUpdate { if op == "add" { createCameraRecvServerAndListen(key) } else { deleteCameraRecvServer(key) } } } } //动态处理文件 func autoUpdateFiles(fileflag chan bool) { for _ = range fileflag { logger.Info("do auto del fileflag") var oldIds []string for key := range SocketManage { if strings.HasPrefix(key, util.File_Video_Id_Pre) || strings.HasPrefix(key, util.File_Audio_Id_Pre) || strings.HasPrefix(key, util.File_Img_Id_Pre) { oldIds = append(oldIds, key) } } var newIds []string for _, f := range util.FileArr { newIds = append(newIds, f.Id) } fileListUpdate := util.Difference(oldIds, newIds) logger.Info(fileListUpdate) for key, op := range fileListUpdate { if op == "add" { createCameraRecvServerAndListen(key) } else { deleteCameraRecvServer(key) } } } } func Recv(socket util.SocketContext) { tryCount := 0 var recvmessage []byte var imagemsg protomsg.Image var err error for { select { case <-socket.Context.Done(): socket.Sock.Close() logger.Error("listen recv quit") return default: if recvmessage, err = socket.Sock.Recv(); err != nil { // tryCount++ // socket = util.MaybeRestartSocket(socket, &tryCount) if socket.UseSHM { if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") tryCount = 0 continue } tryCount++ } continue } unmsg, err := util.UnCompress(recvmessage) if err != nil { logger.Error(err) continue } if err := proto.Unmarshal(unmsg, &imagemsg); err != nil { logger.Error("recv msg is not protomsgImage") continue } if faceExtractWebCID == imagemsg.Cid { //以图搜图 logger.Info("id: ", faceExtractWebTaskID) doTaskList(imagemsg.Cid, "", faceExtractWebTaskID, recvmessage) } else { taskIDs := GetAllTaskByID(imagemsg.Cid) for _, taskID := range taskIDs { doTaskList(imagemsg.Cid, "", taskID, recvmessage) } } } } } // 据cid 获取 所有的任务 func GetAllTaskByID(cid string) (tasks []string) { for _, camsingle := range util.CameraTasks { if cid == camsingle.Camera.Id { for _, tasksingle := range camsingle.Tasks { if !tasksingle.Enable { continue } tasks = append(tasks, tasksingle.Taskid) } return } } return } func doTaskList(cid string, caddr string, taskid string, data []byte) { // 数据加工(打标签) //logger.Info("taskid: ", taskid, "has ", len(data), "data[]byte") sdkmsg := sdk.ToSdkMsg(cid, caddr, taskid, data) if sdkmsg.Tasklab == nil { logger.Error(cid, " not have taskid: ", taskid) return } // 计算分发的主题 SendTopic := sdk.GetSdkSendTopic(sdkmsg) //logger.Info(SendTopic) if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { logger.Info(SendTopic) } if _, ok := sdk.SdkMap[SendTopic]; ok { if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { logger.Info(SendTopic) } sdk.SdkMap[SendTopic] <- sdkmsg if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { logger.Info("dispute sendtopic success", SendTopic) } } else { logger.Info("分发的主题不存在") } }