| | |
| | | if tryCount > util.ShmMaxTryCount { |
| | | socket.Sock.Close() |
| | | socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) |
| | | logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") |
| | | // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") |
| | | tryCount = 0 |
| | | continue |
| | | } |
| | |
| | | } else { |
| | | taskIDs := GetAllTaskByID(imagemsg.Cid) |
| | | for _, taskID := range taskIDs { |
| | | logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID) |
| | | // logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID) |
| | | doTaskList(imagemsg.Cid, "", taskID, recvmessage) |
| | | } |
| | | } |
| | |
| | | // 计算分发的主题 |
| | | SendTopic := sdk.GetSdkSendTopic(sdkmsg) |
| | | //logger.Info(SendTopic) |
| | | if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { |
| | | logger.Info(SendTopic) |
| | | } |
| | | |
| | | if _, ok := sdk.SdkMap[SendTopic]; ok { |
| | | if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { |
| | | logger.Info(SendTopic) |
| | | } |
| | | |
| | | logger.Debug("recv from camera id: ", cid, " len: ", len(data), " send to sdk id: ", SendTopic) |
| | | |
| | | sdk.SdkMap[SendTopic] <- sdkmsg |
| | | if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { |
| | | logger.Info("dispute sendtopic success", SendTopic) |
| | | } |
| | | |
| | | } else { |
| | | logger.Info("分发的主题不存在") |
| | | } |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "flag" |
| | | "net/http" |
| | | "flag" |
| | | _ "net/http/pprof" |
| | | "time" |
| | | "taskpubsub/camera" |
| | | "taskpubsub/sdk" |
| | | "taskpubsub/tasktag" |
| | | "taskpubsub/util" |
| | | "time" |
| | | |
| | | "basic.com/valib/logger.git" |
| | | "github.com/spf13/viper" |
| | | ) |
| | |
| | | var useShm bool |
| | | |
| | | const ( |
| | | configFilePath = "/opt/vasystem/config/" |
| | | configFilePath = "/opt/vasystem/config/" |
| | | configFileType = "yaml" |
| | | LOGBASEPATH = "/data/disk1/valog/taskpubsub.log" |
| | | LOGBASEPATH = "/data/disk1/valog/taskpubsub.log" |
| | | ) |
| | | |
| | | var envirment = flag.String("e", "pro", "") |
| | | |
| | | func init(){ |
| | | func init() { |
| | | viper.SetConfigType(configFileType) |
| | | viper.SetConfigName(*envirment) |
| | | viper.AddConfigPath(configFilePath) |
| | |
| | | //panic(err) |
| | | } |
| | | |
| | | var logFile = LOGBASEPATH |
| | | var logFile = LOGBASEPATH |
| | | if viper.GetString("LogBasePath") != "" { |
| | | logFile = viper.GetString("LogBasePath") + "/taskpubsub.log" |
| | | } |
| | | |
| | | // 日志初始化 |
| | | // 日志初始化 |
| | | if viper.IsSet("LogLevel") && |
| | | viper.GetInt("LogLevel") >= logger.PanicLevel && |
| | | viper.GetInt("LogLevel") <= logger.DebugLevel { |
| | | logger.Config(logFile, viper.GetInt("LogLevel")) |
| | | }else{ |
| | | } else { |
| | | logger.Config(logFile, logger.DebugLevel) |
| | | } |
| | | var logSaveDays = 15 |
| | | logger.SetSaveDays(logSaveDays) |
| | | var logSaveDays = 15 |
| | | logger.SetSaveDays(logSaveDays) |
| | | logger.Info("loginit success !") |
| | | |
| | | flag.BoolVar(&useShm, "shm", false, "use shm for performance") |
| | | } |
| | | |
| | | func main() { |
| | | flag.Parse() |
| | | time.Sleep(time.Second) |
| | | |
| | | if useShm{ |
| | | flag.BoolVar(&useShm, "shm", false, "use shm for performance") |
| | | } |
| | | |
| | | func main() { |
| | | flag.Parse() |
| | | time.Sleep(time.Second) |
| | | |
| | | if useShm { |
| | | logger.Info("USE SHARE MEMORY") |
| | | }else{ |
| | | } else { |
| | | logger.Info("USE PIPE") |
| | | } |
| | | // pprof 用于分析性能 |
| | |
| | | go util.Init(initchan) |
| | | |
| | | logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan) |
| | | sdk.Init(useShm) // 获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行) |
| | | tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签 |
| | | camera.Init(useShm) //获取cid, taskid, sdkid ,关系 |
| | | sdk.Init(useShm) // 获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行) |
| | | tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签 |
| | | camera.Init(useShm) //获取cid, taskid, sdkid ,关系 |
| | | select {} |
| | | } |
| | |
| | | |
| | | var shm bool = false |
| | | |
| | | var innerRecvTopic = []string{ |
| | | "facedetect-sdk-no-track", //to sdk-no-track 以图搜图 |
| | | } |
| | | |
| | | var innerSendTopic = []string{ |
| | | "facedetect-sdk-no-track", //to sdk-no-track 以图搜图 |
| | | "virtual-faceextract-sdk-pull", //to web 以图搜图 |
| | | } |
| | | |
| | | func initInnerTopic() { |
| | | // for _, sendTopic := range innerSendTopic { |
| | | // createSdkSendServerAndListen(sendTopic) |
| | | // } |
| | | |
| | | // for _, recvTopic := range innerRecvTopic { |
| | | // createSdkRecvServerAndListen(recvTopic) |
| | | // } |
| | | } |
| | | |
| | | func Init(useShm bool) { |
| | | |
| | | shm = useShm |
| | |
| | | logger.Info("============= init sdk info =====================") |
| | | for _, sdkid := range util.Sdklist { // 创建sdk server |
| | | createSdkTopicAndServer(sdkid) |
| | | logger.Info() |
| | | } |
| | | |
| | | // 手动输入的主题 |
| | | initInnerTopic() |
| | | |
| | | // es |
| | | SdkMap["es"] = make(chan protomsg.SdkMessage) |
| | | logger.Info("create es channel: ") |
| | | go DealEsTopic() |
| | | |
| | | go autoUpdateSdk(util.Sdkflag) |
| | | } |
| | |
| | | func createSdkSendServerAndListen(id string) { |
| | | if _, isExist := SdkMap[id]; !isExist { //不存在 |
| | | SdkMap[id] = make(chan protomsg.SdkMessage) |
| | | logger.Info("create", id) |
| | | } |
| | | |
| | | url := "ipc:///tmp/" + id + postPush + ".ipc" |
| | |
| | | url = id + postPush |
| | | } |
| | | |
| | | logger.Info("SDK URL: ", url) |
| | | |
| | | socket, err := util.NewSocketListen(int(m), url, shm) |
| | | if err != nil { |
| | | delete(SdkMap, id) |
| | | logger.Error(id, "create socket error!") |
| | | return |
| | | } |
| | | |
| | | SocketManage[id] = socket |
| | | logger.Info("SDK URL Send: ", url) |
| | | |
| | | go Send(id, socket, SdkMap[id]) |
| | | } |
| | |
| | | logger.Error(id, "create socket error!") |
| | | return |
| | | } |
| | | |
| | | SocketManage[id] = socket |
| | | logger.Info("SDK URL Recv: ", url) |
| | | |
| | | go Recv(socket) |
| | | } |
| | |
| | | } |
| | | |
| | | newSdkList := util.Sdklist |
| | | |
| | | // 手动添加的全部加上 |
| | | for _, sendTopic := range innerSendTopic { |
| | | newSdkList = append(newSdkList, sendTopic) |
| | | } |
| | | for _, recvTopic := range innerRecvTopic { |
| | | newSdkList = append(newSdkList, recvTopic) |
| | | } |
| | | |
| | | sdkListUpdate := util.Difference(oldSdkList, newSdkList) |
| | | logger.Info(sdkListUpdate) |
| | |
| | | func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) { |
| | | if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { |
| | | sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid |
| | | if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && |
| | | "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && |
| | | "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype { |
| | | //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" { |
| | | logger.Info("----------Sdktype:yitusoutu") |
| | | logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype) |
| | | } |
| | | // if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && |
| | | // "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && |
| | | // "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype { |
| | | // //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" { |
| | | // logger.Info("----------Sdktype:yitusoutu") |
| | | // logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype) |
| | | // } |
| | | } else { |
| | | sendTopic = "es" |
| | | } |
| | |
| | | if tryCount > util.ShmMaxTryCount { |
| | | socket.Sock.Close() |
| | | socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) |
| | | logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") |
| | | // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") |
| | | tryCount = 0 |
| | | continue |
| | | } |
| | |
| | | } |
| | | continue |
| | | } else { |
| | | |
| | | err = proto.Unmarshal(msg, &repsdkmsg) |
| | | if err != nil { |
| | | logger.Error("unmarshal error: ", err) |
| | |
| | | repsdkmsg.Tasklab.Index++ |
| | | //调用计算函数, 分发给下一个主题 |
| | | nexttopic := GetSdkSendTopic(repsdkmsg) |
| | | if "facedetect-sdk-no-track" == nexttopic || "virtual-faceextract-sdk-pull" == nexttopic { |
| | | logger.Info("nexttopic:", nexttopic) |
| | | } |
| | | SdkMap[nexttopic] <- repsdkmsg |
| | | logger.Info("recv from URL: ", socket.URL, " success: ", len(msg), " send to: ", repsdkmsg) |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | if err := socket.Sock.Send(data); err != nil { |
| | | if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid { |
| | | logger.Error("failed send:sdkid=", sdkid) |
| | | } |
| | | |
| | | // tryCount++ |
| | | // socket = util.MaybeRestartSocket(socket, &tryCount) |
| | |
| | | if tryCount > util.ShmMaxTryCount { |
| | | socket.Sock.Close() |
| | | socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) |
| | | logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") |
| | | // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") |
| | | tryCount = 0 |
| | | continue |
| | | } |
| | |
| | | |
| | | continue |
| | | } |
| | | if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid { |
| | | logger.Info(sdkid, " send success: ", len(data)) |
| | | } |
| | | |
| | | logger.Info("send to sdk id: ", sdkid, " success: ", len(data)) |
| | | |
| | | } else { |
| | | logger.Debug(sdkid, " 主题关闭, 关闭send()") |
| | | return |
| | |
| | | import ( |
| | | "sync" |
| | | |
| | | "taskpubsub/util" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/logger.git" |
| | | "taskpubsub/util" |
| | | ) |
| | | |
| | | var TaskLabelMap sync.Map |
| | |
| | | } |
| | | updateTaskLabelMap(newtls) |
| | | TaskLabelMap.Range(func(k, v interface{}) bool { |
| | | logger.Info(k, v) |
| | | return true |
| | | }) |
| | | } |
| | |
| | | package util |
| | | |
| | | import ( |
| | | "flag" |
| | | |
| | | "basic.com/dbapi.git" |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/gopherdiscovery.git" |
| | | "flag" |
| | | "github.com/gogo/protobuf/proto" |
| | | "basic.com/valib/logger.git" |
| | | "github.com/gogo/protobuf/proto" |
| | | ) |
| | | |
| | | /************************* |
| | |
| | | func initDbData(initchan chan bool) { |
| | | CameraIds = camval.FindAll() |
| | | |
| | | FileArr,_ = fileApi.GetAnalysisFiles() |
| | | FileArr, _ = fileApi.GetAnalysisFiles() |
| | | |
| | | logger.Info("==============camera camera with task ================") |
| | | CameraTasks = camval.FindAllCameraAndTask() |
| | | logger.Info(CameraTasks) |
| | | |
| | | TaskSdks = taskapi.FindAllTaskSdkRun() |
| | | |
| | |
| | | logger.Info("update camera finish.") |
| | | case protomsg.TableChanged_T_File: |
| | | logger.Info("update analysis files") |
| | | FileArr,_ = fileApi.GetAnalysisFiles() |
| | | FileArr, _ = fileApi.GetAnalysisFiles() |
| | | Fileflag <- true |
| | | logger.Info("analysis files:",FileArr) |
| | | logger.Info("analysis files:", FileArr) |
| | | logger.Info("update files finish.") |
| | | |
| | | case protomsg.TableChanged_T_CameraTask: |
| | |
| | | |
| | | // create server |
| | | func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) { |
| | | logger.Info("url is: ", url) |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | |
| | | socket.Context = ctx |