package camera import ( "errors" "basic.com/valib/deliver.git" "github.com/long/test/sdk" "github.com/long/test/util" "github.com/gogo/protobuf/proto" "basic.com/pubsub/protomsg.git" "context" "fmt" "sync" // "time" ) var SocketManage sync.Map type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } func Init() { fmt.Println("============ camera info ====================") for _, cd := range util.CameraIds { fmt.Println(cd) fmt.Println() } // 摄像机初始化 for _, cam := range util.CameraIds { CreateCamera(cam.Id, "camera") } // web端初始化 CreateCamera("virtual-faceextract-sdk-pull_2" , "web") // go AutoDelCamera(util.Cameraflag) } // camera 接受数据 func CreateCamera(id string, remote string) { if _, ok := SocketManage.Load(id); !ok { url := fmt.Sprintf("ipc:///tmp/%s.ipc", id) socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url) if err != nil { fmt.Println("create socket error") return } go Recv(socketlisten, remote) } } //动态处理 func AutoDelCamera(cameraflag chan bool) { for _ = range cameraflag { fmt.Println("test autodelcameraflag") var oldcamera []string SocketManage.Range(func(k, v interface{}) bool { if str, ok := k.(string); ok { oldcamera = append(oldcamera, str) } return true }) var newcamera []string newcamera = append(newcamera, util.Sdklist...) cameraChanDel := util.Difference(oldcamera, newcamera) fmt.Println(cameraChanDel) for key, op := range cameraChanDel { if op == "add" { CreateCamera(key, "camera") } else { if sock, ok := SocketManage.Load(key); ok { if socket, sok := sock.(SocketContext); sok { socket.Cancel() SocketManage.Delete(key) } } fmt.Println("删除camera server : ", key) } } } } // create server func NewCamerSocketListen(mode int, cameraid string, url string) (socket SocketContext, err error) { ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) fmt.Println("new socket.Sock: ", socket.Sock) if socket.Sock == nil { return socket, errors.New("create listen error") } SocketManage.Store(cameraid, socket) return socket, nil } func Recv(socket SocketContext, remote string ) { var recvmessage []byte var imagemsg protomsg.Image var err error for { select { case <-socket.Context.Done(): fmt.Println("listen recv quit") return default: if recvmessage, err = socket.Sock.Recv(); err != nil { fmt.Println("err is: ", err) continue } unmsg, err := util.UnCompress(recvmessage) if err != nil { fmt.Println(err) continue } if err := proto.Unmarshal(unmsg,&imagemsg); err != nil { fmt.Println("recv msg is not protomsgImage") continue } fmt.Println("============== one msg input ==========") switch remote { case "camera": for _, taskid := range GetAlltask(imagemsg.Cid) { // time.Sleep(5 * time.Second) fmt.Println("id: ", imagemsg.Cid, " taskid: ", taskid) Taskdolist(imagemsg.Cid, "", taskid, recvmessage) } case "web": fmt.Println("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2") Taskdolist(imagemsg.Cid,"", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2", recvmessage) } } } } // 据cid 获取 所有的任务 func GetAlltask(cid string) (tasks []string) { for _, camsingle := range util.CameraTasks { if cid == camsingle.Camera.Id { for _, tasksingle := range camsingle.Tasks { tasks = append(tasks, tasksingle.Taskid) } return } } return } func Taskdolist(cid string, caddr string, taskid string, data []byte) { // 数据加工(打标签) sdkmsg := sdk.SdkData(cid, caddr, taskid, data) if sdkmsg.Tasklab == nil { fmt.Printf("cid:%s 没有任务%s\n", cid, taskid) return } // 计算分发的主题 SendTopic := sdk.SdkSendTopic(sdkmsg) fmt.Println(SendTopic) if _, ok := sdk.SdkMap[SendTopic]; ok { sdk.SdkMap[SendTopic] <- sdkmsg fmt.Println("dispute sendtopic success", SendTopic) } else { fmt.Println("分发的主题不存在") } }