package sdk import ( "context" "errors" "fmt" // "github.com/long/test/httpclient" "github.com/long/test/tasktag" "github.com/long/test/util" "github.com/gogo/protobuf/proto" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" "github.com/long/test/logger" ) const ( postPull = "_1.ipc" postPush = "_2.ipc" ) var SocketManage = make(map[string]SocketContext) var SdkMap = make(map[string]chan protomsg.SdkMessage) type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } func Init() { logger.Info("============= init sdk info =====================") for _, sdkid := range util.Sdklist { // 创建sdk server CreatesdkTopicandServer(sdkid) logger.Info() } // 手动输入的主题 // sdk-no-track CreatesdkTopicandServer("facedetect-sdk-no-track") logger.Info() // es SdkMap["es"] = make(chan protomsg.SdkMessage) logger.Info("create es channel: ") // web SdkMap["virtual-faceextract-sdk-pull"] = make(chan protomsg.SdkMessage) logger.Info("create virtual-faceextract-sdk-pull") Createwebserver("virtual-faceextract-sdk-pull") go Dealextern() go AutoDelSdk(util.Sdkflag) } func CreatesdkTopicandServer(sdkid string) { SdkMap[sdkid] = make(chan protomsg.SdkMessage) logger.Info("create sdk channel: ", sdkid) url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull) socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { delete(SdkMap, sdkid) logger.Error(sdkid, "create server error!") return } go Send(sdkid, socketser, SdkMap[sdkid]) url = fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPush) socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { delete(SdkMap, sdkid) logger.Error(sdkid, "create dial error!") return } go Recv(socketdial) } // web 接受端处理 func Createwebserver(webid string ){ url := fmt.Sprintf("ipc:///tmp/%s%s", webid, postPull) socketser, err := NewSdkSocketListen(deliver.PushPull, webid, url) if err != nil { logger.Error(webid, "create server error!") return } go Send(webid, socketser, SdkMap[webid]) } func DeletesdkTopicandServer(sdkid string) { close(SdkMap[sdkid]) delete(SdkMap, sdkid) logger.Info("删除主题 sdk: ", sdkid) SocketManage[sdkid].Cancel() delete(SocketManage, sdkid) logger.Info("删除server sdk: ", sdkid) } //单独处理 es 主题的情况 func Dealextern() { for { select { case <-SdkMap["es"]: //logger.Info("es finanl sdk!") } } } //动态处理 func AutoDelSdk(sdkflag chan bool) { for _ = range sdkflag { logger.Info("test autodelsdk") var oldSdk []string for key, _ := range SdkMap { oldSdk = append(oldSdk, key) } // 手动添加的全部加上 util.Sdklist = append(util.Sdklist, "es") util.Sdklist = append(util.Sdklist, "virtual-faceextract-sdk-pull") util.Sdklist = append(util.Sdklist, "facedetect-sdk-no-track") sdkChanDel := util.Difference(oldSdk, util.Sdklist) logger.Info(sdkChanDel) for key, op := range sdkChanDel { if op == "add" { CreatesdkTopicandServer(key) } else { DeletesdkTopicandServer(key) logger.Info("删除主题 sdk: ", key) } } } } //sdk数据 加工器 func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage { var sdkmsg = protomsg.SdkMessage{} sdkmsg.Cid = cid sdkmsg.Caddr =caddr if val, ok := tasktag.TaskMapLab.Load(taskid); !ok { sdkmsg.Tasklab = nil return sdkmsg } else { sdkmsg.Tasklab = val.(*protomsg.TaskLabel) sdkmsg.Data = data } return sdkmsg } //sdk数据分发器 func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid } else { sdksend = "es" } logger.Debug("分发的主题: ", sdksend , "位置:", int(sdkmsg.Tasklab.Index)+1,"/", len(sdkmsg.Tasklab.Sdkinfos)) return } // create server func NewSdkSocketListen(mode int, sdkid string, url string) (socket SocketContext, err error) { logger.Info("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) if socket.Sock == nil { return socket, errors.New("create listen error") } SocketManage[sdkid] = socket return socket, nil } func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { logger.Info("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewClient(deliver.Mode(mode), url) if socket.Sock == nil { return sdkid, socket, errors.New("create listen error") } SocketManage[sdkid] = socket return sdkid, socket, nil } func Recv(socket SocketContext) { var repsdkmsg = protomsg.SdkMessage{} for { select { case <-socket.Context.Done(): logger.Info("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { continue } else { err = proto.Unmarshal(msg, &repsdkmsg) if err != nil { logger.Error("unmarshal error: ", err) continue } repsdkmsg.Tasklab.Index++ //调用计算函数, 分发给下一个主题 nexttopic := SdkSendTopic(repsdkmsg) SdkMap[nexttopic] <- repsdkmsg } } } } func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) { for { select { case <-socket.Context.Done(): logger.Info("socket is close") return case v, ok := <-in: if ok { data, err :=v.Marshal() if err != nil { logger.Error("proto marshal error ", err) continue } if err := socket.Sock.Send(data); err != nil { logger.Error("failed send") continue } logger.Debug(sdkid, " send success: ", len(data)) } else { logger.Debug(sdkid, " 主题关闭, 关闭send()") return } } } }