package camera import ( "errors" "basic.com/dbapi.git" "basic.com/valib/deliver.git" "github.com/long/test/sdk" "github.com/long/test/util" "context" "fmt" "sync" //"time" ) //var SocketManage = make(map[string]SocketContext) var SocketManage sync.Map var Initchannel = make(chan string) type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } var camval dbapi.CameraApi func Init() { fmt.Println("============ camera info ====================") for _, cd := range util.CameraIds { fmt.Println(cd) fmt.Println() } go CreateCamera(Initchannel) go AutoDelCamera(util.Cameraflag) for _, cam := range util.CameraIds { Initchannel <- cam.Id } } func CreateCamera(camera chan string) { for camid := range camera { if _, ok := SocketManage.Load(camid); !ok { url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid) id, socketlisten, err := NewCamerSocketListen(deliver.PushPull, camid, url) if err != nil { fmt.Println("create socket error") continue } go func(cid string, sock SocketContext) { Recv(cid, sock) }(id, socketlisten) } } } //动态处理 func AutoDelCamera(cameraflag chan bool) { for _ = range cameraflag { fmt.Println("test autodelcameraflag") var oldcamera []string SocketManage.Range(func(k, v interface{}) bool { if str, ok := k.(string); ok { oldcamera = append(oldcamera, str) } return true }) var newcamera []string for _, value := range util.CameraIds { newcamera = append(newcamera, value.Id) } cameraChanDel := util.Difference(oldcamera, newcamera) fmt.Println(cameraChanDel) for key, op := range cameraChanDel { if op == "add" { Initchannel <- key } else { if sock, ok := SocketManage.Load(key); ok { if socket, sok := sock.(SocketContext); sok { socket.Cancel() SocketManage.Delete(key) } } fmt.Println("删除camera server : ", key) } } } } // create server func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) { ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) fmt.Println("new socket.Sock: ", socket.Sock) if socket.Sock == nil { return cameraid, socket, errors.New("create listen error") } SocketManage.Store(cameraid, socket) return cameraid, socket, nil } func Recv(cameraid string, socket SocketContext) { var msg []byte var err error for { select { case <-socket.Context.Done(): fmt.Println("listen recv quit") return default: if msg, err = socket.Sock.Recv(); err != nil { fmt.Println("err is: ", cameraid, err) continue } else { fmt.Println() fmt.Println("============== one msg input ==========") for _, taskid := range GetAlltask(cameraid) { //time.Sleep(5 * time.Second) fmt.Println("cameraid: ", cameraid, " taskid: ", taskid) Taskdolist(cameraid, taskid, msg) } } } } } // 根据cid 获取 所有的任务 func GetAlltask(cid string) (tasks []string) { for _, camsingle := range util.CameraTasks { if cid == camsingle.Camera.Id { for _, tasksingle := range camsingle.Tasks { tasks = append(tasks, tasksingle.Taskid) } return } } return } func Taskdolist(cid string, taskid string, data []byte) { // 数据加工(打标签) sdkmsg := sdk.SdkData(cid, taskid, data) if sdkmsg.Tasklab == nil { fmt.Printf("cid:%s 没有任务%s\n", cid, taskid) return } // 计算分发的主题 SendTopic := sdk.SdkSendTopic(sdkmsg) if _, ok := sdk.SdkMap[SendTopic]; ok { fmt.Println("分发的主题存在") sdk.SdkMap[SendTopic] <- sdkmsg //fmt.Println("重新开始循环: ", sdk.SdkMap) } else { fmt.Println("分发的主题不存在") } }