package sdk import ( "taskpubsub/tasktag" "taskpubsub/util" "github.com/gogo/protobuf/proto" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" "basic.com/valib/logger.git" ) const ( postPush = "_1" postPull = "_2" ) var SocketManage = make(map[string]util.SocketContext) var SdkMap = make(map[string]chan protomsg.SdkMessage) var shm bool = false func Init(useShm bool) { shm = useShm logger.Info("============= init sdk info =====================") for _, sdkid := range util.Sdklist { // 创建sdk server createSdkTopicAndServer(sdkid) } go autoUpdateSdk(util.Sdkflag) } func createSdkTopicAndServer(sdkid string) { createSdkSendServerAndListen(sdkid) createSdkRecvServerAndListen(sdkid) } func deleteSdkTopicAndServer(id string) { if _, isExist := SdkMap[id]; isExist { //存在 close(SdkMap[id]) delete(SdkMap, id) logger.Info("删除主题 sdk: ", id) } if _, isExist := SocketManage[id]; isExist { //存在 SocketManage[id].Cancel() delete(SocketManage, id) logger.Info("删除server sdk: ", id) } } func createSdkSendServerAndListen(id string) { if _, isExist := SdkMap[id]; !isExist { //不存在 SdkMap[id] = make(chan protomsg.SdkMessage) } url := "ipc:///tmp/" + id + postPush + ".ipc" m := deliver.PushPull if shm { m = deliver.Shm url = id + postPush } socket, err := util.NewSocketListen(int(m), url, shm) if err != nil { delete(SdkMap, id) logger.Error(id, "create socket error!") return } SocketManage[id] = socket logger.Info("SDK URL Send: ", url) go Send(id, socket, SdkMap[id]) } func createSdkRecvServerAndListen(id string) { if _, isExist := SdkMap[id]; !isExist { //不存在 SdkMap[id] = make(chan protomsg.SdkMessage) logger.Info("create", id) } url := "ipc:///tmp/" + id + postPull + ".ipc" m := deliver.PushPull if shm { m = deliver.Shm url = id + postPull } socket, err := util.NewSocketListen(int(m), url, shm) if err != nil { delete(SdkMap, id) logger.Error(id, "create socket error!") return } SocketManage[id] = socket logger.Info("SDK URL Recv: ", url) go Recv(socket) } //单独处理 es 主题的情况 func DealEsTopic() { for { select { case <-SdkMap["es"]: //logger.Info("es finanl sdk!") } } } //动态处理 func autoUpdateSdk(sdkflag chan bool) { for _ = range sdkflag { logger.Info("test autodelsdk") var oldSdkList []string for key := range SdkMap { oldSdkList = append(oldSdkList, key) } newSdkList := util.Sdklist sdkListUpdate := util.Difference(oldSdkList, newSdkList) logger.Info(sdkListUpdate) for key, op := range sdkListUpdate { if op == "add" { createSdkTopicAndServer(key) } else { deleteSdkTopicAndServer(key) } } } } //sdk数据 加工器 func ToSdkMsg(cid string, caddr string, taskid string, data []byte) protomsg.SdkMessage { var sdkmsg = protomsg.SdkMessage{} sdkmsg.Cid = cid sdkmsg.Caddr = caddr if val, ok := tasktag.TaskLabelMap.Load(taskid); !ok { sdkmsg.Tasklab = nil return sdkmsg } else { sdkmsg.Tasklab = val.(*protomsg.TaskLabel) sdkmsg.Data = data } return sdkmsg } //sdk数据分发器 func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid // if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && // "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && // "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype { // //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" { // logger.Info("----------Sdktype:yitusoutu") // logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype) // } } else { sendTopic = "es" } return sendTopic } func Recv(socket util.SocketContext) { tryCount := 0 var repsdkmsg = protomsg.SdkMessage{} for { select { case <-socket.Context.Done(): socket.Sock.Close() logger.Info("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { // tryCount++ // socket = util.MaybeRestartSocket(socket, &tryCount) if socket.UseSHM { if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") tryCount = 0 continue } tryCount++ } continue } else { err = proto.Unmarshal(msg, &repsdkmsg) if err != nil { logger.Error("unmarshal error: ", err) continue } repsdkmsg.Tasklab.Index++ //调用计算函数, 分发给下一个主题 nexttopic := GetSdkSendTopic(repsdkmsg) SdkMap[nexttopic] <- repsdkmsg logger.Info("recv from URL: ", socket.URL, " success: ", len(msg), " send to: ", repsdkmsg) } } } } func Send(sdkid string, socket util.SocketContext, in chan protomsg.SdkMessage) { tryCount := 0 for { select { case <-socket.Context.Done(): socket.Sock.Close() logger.Info("socket is close") return case v, ok := <-in: if ok { data, err := v.Marshal() if err != nil { logger.Error("proto marshal error ", err) continue } if err := socket.Sock.Send(data); err != nil { // tryCount++ // socket = util.MaybeRestartSocket(socket, &tryCount) if socket.UseSHM { if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") tryCount = 0 continue } tryCount++ } continue } logger.Info("send to sdk id: ", sdkid, " success: ", len(data)) } else { logger.Debug(sdkid, " 主题关闭, 关闭send()") return } } } }