package sdk import ( "taskpubsub/tasktag" "taskpubsub/util" "github.com/gogo/protobuf/proto" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" "basic.com/valib/logger.git" ) const ( postPush = "_1" postPull = "_2" ) var SocketManage = make(map[string]util.SocketContext) var SdkMap = make(map[string]chan protomsg.SdkMessage) var shm bool = false var innerRecvTopic = []string{ "facedetect-sdk-no-track", //to sdk-no-track 以图搜图 } var innerSendTopic = []string{ "facedetect-sdk-no-track", //to sdk-no-track 以图搜图 "virtual-faceextract-sdk-pull", //to web 以图搜图 } func initInnerTopic() { // for _, sendTopic := range innerSendTopic { // createSdkSendServerAndListen(sendTopic) // } // for _, recvTopic := range innerRecvTopic { // createSdkRecvServerAndListen(recvTopic) // } } func Init(useShm bool) { shm = useShm logger.Info("============= init sdk info =====================") for _, sdkid := range util.Sdklist { // 创建sdk server createSdkTopicAndServer(sdkid) logger.Info() } // 手动输入的主题 initInnerTopic() // es SdkMap["es"] = make(chan protomsg.SdkMessage) logger.Info("create es channel: ") go DealEsTopic() go autoUpdateSdk(util.Sdkflag) } func createSdkTopicAndServer(sdkid string) { createSdkSendServerAndListen(sdkid) createSdkRecvServerAndListen(sdkid) } func deleteSdkTopicAndServer(id string) { if _, isExist := SdkMap[id]; isExist { //存在 close(SdkMap[id]) delete(SdkMap, id) logger.Info("删除主题 sdk: ", id) } if _, isExist := SocketManage[id]; isExist { //存在 SocketManage[id].Cancel() delete(SocketManage, id) logger.Info("删除server sdk: ", id) } } func createSdkSendServerAndListen(id string) { if _, isExist := SdkMap[id]; !isExist { //不存在 SdkMap[id] = make(chan protomsg.SdkMessage) logger.Info("create", id) } url := "ipc:///tmp/" + id + postPush + ".ipc" m := deliver.PushPull if shm { m = deliver.Shm url = id + postPush } logger.Info("SDK URL: ", url) socket, err := util.NewSocketListen(int(m), url, shm) if err != nil { delete(SdkMap, id) logger.Error(id, "create socket error!") return } SocketManage[id] = socket go Send(id, socket, SdkMap[id]) } func createSdkRecvServerAndListen(id string) { if _, isExist := SdkMap[id]; !isExist { //不存在 SdkMap[id] = make(chan protomsg.SdkMessage) logger.Info("create", id) } url := "ipc:///tmp/" + id + postPull + ".ipc" m := deliver.PushPull if shm { m = deliver.Shm url = id + postPull } socket, err := util.NewSocketListen(int(m), url, shm) if err != nil { delete(SdkMap, id) logger.Error(id, "create socket error!") return } SocketManage[id] = socket go Recv(socket) } //单独处理 es 主题的情况 func DealEsTopic() { for { select { case <-SdkMap["es"]: //logger.Info("es finanl sdk!") } } } //动态处理 func autoUpdateSdk(sdkflag chan bool) { for _ = range sdkflag { logger.Info("test autodelsdk") var oldSdkList []string for key := range SdkMap { oldSdkList = append(oldSdkList, key) } newSdkList := util.Sdklist // 手动添加的全部加上 // for _, sendTopic := range innerSendTopic { // newSdkList = append(newSdkList, sendTopic) // } // for _, recvTopic := range innerRecvTopic { // newSdkList = append(newSdkList, recvTopic) // } sdkListUpdate := util.Difference(oldSdkList, newSdkList) logger.Info(sdkListUpdate) for key, op := range sdkListUpdate { if op == "add" { createSdkTopicAndServer(key) } else { deleteSdkTopicAndServer(key) } } } } //sdk数据 加工器 func ToSdkMsg(cid string, caddr string, taskid string, data []byte) protomsg.SdkMessage { var sdkmsg = protomsg.SdkMessage{} sdkmsg.Cid = cid sdkmsg.Caddr = caddr if val, ok := tasktag.TaskLabelMap.Load(taskid); !ok { sdkmsg.Tasklab = nil return sdkmsg } else { sdkmsg.Tasklab = val.(*protomsg.TaskLabel) sdkmsg.Data = data } return sdkmsg } var logShouldUntil = 0 //sdk数据分发器 func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid logShouldUntil++ if logShouldUntil > 68 { logShouldUntil = 0 logger.Info("=========分发的主题=========") logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype) } } else { sendTopic = "es" } return sendTopic } func Recv(socket util.SocketContext) { tryCount := 0 var repsdkmsg = protomsg.SdkMessage{} for { select { case <-socket.Context.Done(): socket.Sock.Close() logger.Info("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { // tryCount++ // socket = util.MaybeRestartSocket(socket, &tryCount) if socket.UseSHM { if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) logger.Info("=========SDK RECV SHM ERROR=========") logger.Info("SDK RECV FROM:", socket.URL, " ERROR ", util.ShmMaxTryCount, " TIMES SO RESTART IT") tryCount = 0 continue } tryCount++ } continue } else { err = proto.Unmarshal(msg, &repsdkmsg) if err != nil { logger.Error("unmarshal error: ", err) continue } repsdkmsg.Tasklab.Index++ //调用计算函数, 分发给下一个主题 nexttopic := GetSdkSendTopic(repsdkmsg) if "facedetect-sdk-no-track" == nexttopic || "virtual-faceextract-sdk-pull" == nexttopic { logger.Info("nexttopic:", nexttopic) } SdkMap[nexttopic] <- repsdkmsg } } } } func Send(sdkid string, socket util.SocketContext, in chan protomsg.SdkMessage) { tryCount := 0 for { select { case <-socket.Context.Done(): socket.Sock.Close() logger.Info("socket is close") return case v, ok := <-in: if ok { data, err := v.Marshal() if err != nil { logger.Error("proto marshal error ", err) continue } if err := socket.Sock.Send(data); err != nil { if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid { logger.Error("failed send:sdkid=", sdkid) } // tryCount++ // socket = util.MaybeRestartSocket(socket, &tryCount) if socket.UseSHM { if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) logger.Info("=========SDK SEND SHM ERROR=========") logger.Info("SDK SEND TO: ", socket.URL, " ERROR ", util.ShmMaxTryCount, " TIMES SO RESTART IT") tryCount = 0 continue } tryCount++ } continue } if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid { logger.Info(sdkid, " send success: ", len(data)) } } else { logger.Debug(sdkid, " 主题关闭, 关闭send()") return } } } }