package camera import ( "errors" "basic.com/valib/deliver.git" "github.com/long/test/sdk" "github.com/long/test/util" "basic.com/pubsub/protomsg.git" "context" "fmt" "sync" // "time" ) //var SocketManage = make(map[string]SocketContext) var SocketManage sync.Map var Initchannel = make(chan protomsg.Camera ) type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } func Init() { fmt.Println("============ camera info ====================") for _, cd := range util.CameraIds { fmt.Println(cd) fmt.Println() } go CreateCamera(Initchannel) go AutoDelCamera(util.Cameraflag) for _, cam := range util.CameraIds { Initchannel <- cam } } func CreateCamera(camera chan protomsg.Camera) { for cam := range camera { camid := cam.Id caddr := cam.Addr if _, ok := SocketManage.Load(camid); !ok { url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid) id, socketlisten, err := NewCamerSocketListen(deliver.PushPull, camid, url) if err != nil { fmt.Println("create socket error") continue } go func(cid string, cameraaddr string, sock SocketContext) { Recv(cid, cameraaddr, sock) }(id, caddr, socketlisten) } } } //动态处理 func AutoDelCamera(cameraflag chan bool) { for _ = range cameraflag { fmt.Println("test autodelcameraflag") var oldcamera []string SocketManage.Range(func(k, v interface{}) bool { if str, ok := k.(string); ok { oldcamera = append(oldcamera, str) } return true }) var newcamera []string for _, value := range util.CameraIds { newcamera = append(newcamera, value.Id) } cameraChanDel := util.Difference(oldcamera, newcamera) fmt.Println(cameraChanDel) for key, op := range cameraChanDel { if op == "add" { for _, value := range util.CameraIds { if key == value.Id{ Initchannel <- value } } } else { if sock, ok := SocketManage.Load(key); ok { if socket, sok := sock.(SocketContext); sok { socket.Cancel() SocketManage.Delete(key) } } fmt.Println("删除camera server : ", key) } } } } // create server func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) { ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) fmt.Println("new socket.Sock: ", socket.Sock) if socket.Sock == nil { return cameraid, socket, errors.New("create listen error") } SocketManage.Store(cameraid, socket) return cameraid, socket, nil } func Recv(cameraid string, caddr string , socket SocketContext) { var msg []byte var err error for { select { case <-socket.Context.Done(): fmt.Println("listen recv quit") return default: if msg, err = socket.Sock.Recv(); err != nil { fmt.Println("err is: ", cameraid, err) continue } else { fmt.Println() fmt.Println("============== one msg input ==========") for _, taskid := range GetAlltask(cameraid) { // time.Sleep(5 * time.Second) fmt.Println("cameraid: ", cameraid, " taskid: ", taskid) Taskdolist(cameraid, caddr, taskid, msg) } } } } } // 据cid 获取 所有的任务 func GetAlltask(cid string) (tasks []string) { for _, camsingle := range util.CameraTasks { if cid == camsingle.Camera.Id { for _, tasksingle := range camsingle.Tasks { tasks = append(tasks, tasksingle.Taskid) } return } } return } func Taskdolist(cid string, caddr string, taskid string, data []byte) { // 数据加工(打标签) sdkmsg := sdk.SdkData(cid, caddr, taskid, data) if sdkmsg.Tasklab == nil { fmt.Printf("cid:%s 没有任务%s\n", cid, taskid) return } // 计算分发的主题 SendTopic := sdk.SdkSendTopic(sdkmsg) if _, ok := sdk.SdkMap[SendTopic]; ok { sdk.SdkMap[SendTopic] <- sdkmsg } else { fmt.Println("分发的主题不存在") } }