package sdk import ( "context" "encoding/json" "fmt" "github.com/long/test/httpclient" "github.com/long/test/tasktag" "github.com/long/test/util" "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/pair" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) var SocketManage = make(map[string]SocketContext) type SocketContext struct { Sock mangos.Socket Context context.Context Cancel context.CancelFunc } func Init() { sdklist := SdkAll() //获取所有sdk fmt.Println("sdk list have: ", sdklist) SdkCreateTopic(sdklist) // 创建主题 for _, sdkid := range sdklist { // 创建sdk server sdkid, socket, err := NewSdkListen(sdkid, "tcp", "192.168.1.124", 0) if err != nil { continue } // 接受管道数据 ==》 发送给 对应的进程 go send(sdkid, socket, SdkMap[sdkid]) //从对应进程接受数据 == 》 重新送回到管道 go Recv(socket) } go es(SdkMap["es"]) } //单独处理 es 主题的情况 func es(sdkmsgchan chan SdkMessage) { for data := range sdkmsgchan { fmt.Println("this data is finish all sdk! ", data) } } //动态处理 func AutoDelSdk(Newsdklist []string) { var oldSdk []string for key, _ := range SdkMap { oldSdk = append(oldSdk, key) } sdkChanDel := util.Difference(oldSdk, Newsdklist) for key, op := range sdkChanDel { if op == "add" { SdkMap[key] = make(chan SdkMessage) fmt.Println("创建主题 sdk: ", key) } else { close(SdkMap[key]) delete(SdkMap, key) fmt.Println("删除主题 sdk: ", key) } } } //主题 var SdkMap = make(map[string]chan SdkMessage) // 发送给算法进程的结构 type SdkMessage struct { Cid string Tasklab tasktag.TaskLabel Data []byte } //sdk数据 加工器 func SdkData(cid string, taskid string, data []byte) (sdkmsg SdkMessage) { sdkmsg.Cid = cid sdkmsg.Tasklab = tasktag.TaskMapLab[taskid] sdkmsg.Data = data return } //sdk数据分发器 func SdkSendTopic(sdkmsg SdkMessage) (sdksend string) { if sdkmsg.Tasklab.Index < len(sdkmsg.Tasklab.Sdkids) { sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index] } else { sdksend = "es" } fmt.Println() fmt.Println("分发的主题是: ", sdksend) fmt.Println() return } // 调用 http 借口获取摄像机信息 func SdkAll() (sdklist []string) { sdklist = httpclient.GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid") return } // 创建主题 func SdkCreateTopic(sdklist []string) (err error) { for _, sdkid := range sdklist { SdkMap[sdkid] = make(chan SdkMessage) fmt.Println("create sdk channel: ", sdkid) } SdkMap["es"] = make(chan SdkMessage) fmt.Println("create es channel: ") return nil } var UrlPort = 9000 // 创建 sdk server listen func NewSdkListen(sdkid string, protocol string, ip string, port int) (sid string, socket SocketContext, err error) { ctx, cancel := context.WithCancel(context.Background()) var url string socket.Context = ctx socket.Cancel = cancel switch protocol { case "tcp": if port == 0 { port = UrlPort UrlPort++ } url = fmt.Sprintf("%s://%s:%d", protocol, ip, port) case "ipc": url = fmt.Sprintf("%s://%s", sdkid) } fmt.Printf("sdkid= %s url=%s\n", sdkid, url) if socket.Sock, err = pair.NewSocket(); err != nil { fmt.Println(sdkid, "can't get new pair socket: ", err.Error()) return sdkid, socket, err } socket.Sock.SetOption(mangos.OptionMaxRecvSize, 32*1024*1024) socket.Sock.SetOption(mangos.OptionWriteQLen, 10) socket.Sock.SetOption(mangos.OptionReadQLen, 10) socket.Sock.AddTransport(tcp.NewTransport()) socket.Sock.AddTransport(ipc.NewTransport()) if err = socket.Sock.Listen(url); err != nil { fmt.Println("socket lisnte error ", sdkid) } SocketManage[sdkid] = socket return sdkid, socket, err } func Recv(socket SocketContext) { socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second) for { select { case <-socket.Context.Done(): fmt.Println("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { //fmt.Printf("%s ", err) continue } else { var repsdkmsg SdkMessage var reps interface{} err = json.Unmarshal(msg, &reps) if err != nil { continue } switch v := reps.(type) { case map[string]interface{}: //调用计算函数, 分发给下一个主题 json.Unmarshal(msg, &repsdkmsg) nexttopic := SdkSendTopic(repsdkmsg) SdkMap[nexttopic] <- repsdkmsg case string: fmt.Println("this string is: ", v) } } } } } func send(sdkid string, socket SocketContext, in chan SdkMessage) { var v SdkMessage var b []byte for { select { case <-socket.Context.Done(): fmt.Println("socket is close") case v = <-in: b, _ = json.Marshal(v) fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(v.Data)) if err := socket.Sock.Send(b); err != nil { fmt.Println("failed send") } fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(v.Data)) } } }