| camera/camera.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| main.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| sdk/sdk.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| tasktag/tasktag.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| util/sqlite.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| util/util.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
camera/camera.go
@@ -182,7 +182,7 @@ if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") tryCount = 0 continue } @@ -207,7 +207,7 @@ } else { taskIDs := GetAllTaskByID(imagemsg.Cid) for _, taskID := range taskIDs { // logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID) logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID) doTaskList(imagemsg.Cid, "", taskID, recvmessage) } } @@ -244,13 +244,17 @@ // 计算分发的主题 SendTopic := sdk.GetSdkSendTopic(sdkmsg) //logger.Info(SendTopic) if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { logger.Info(SendTopic) } if _, ok := sdk.SdkMap[SendTopic]; ok { logger.Debug("recv from camera id: ", cid, " len: ", len(data), " send to sdk id: ", SendTopic) if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { logger.Info(SendTopic) } sdk.SdkMap[SendTopic] <- sdkmsg if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic { logger.Info("dispute sendtopic success", SendTopic) } } else { logger.Info("分发的主题不存在") } main.go
@@ -1,15 +1,14 @@ package main import ( "flag" "net/http" "flag" _ "net/http/pprof" "time" "taskpubsub/camera" "taskpubsub/sdk" "taskpubsub/tasktag" "taskpubsub/util" "time" "basic.com/valib/logger.git" "github.com/spf13/viper" ) @@ -18,14 +17,14 @@ var useShm bool const ( configFilePath = "/opt/vasystem/config/" configFilePath = "/opt/vasystem/config/" configFileType = "yaml" LOGBASEPATH = "/data/disk1/valog/taskpubsub.log" LOGBASEPATH = "/data/disk1/valog/taskpubsub.log" ) var envirment = flag.String("e", "pro", "") func init() { func init(){ viper.SetConfigType(configFileType) viper.SetConfigName(*envirment) viper.AddConfigPath(configFilePath) @@ -36,33 +35,33 @@ //panic(err) } var logFile = LOGBASEPATH var logFile = LOGBASEPATH if viper.GetString("LogBasePath") != "" { logFile = viper.GetString("LogBasePath") + "/taskpubsub.log" } // 日志初始化 // 日志初始化 if viper.IsSet("LogLevel") && viper.GetInt("LogLevel") >= logger.PanicLevel && viper.GetInt("LogLevel") <= logger.DebugLevel { logger.Config(logFile, viper.GetInt("LogLevel")) } else { }else{ logger.Config(logFile, logger.DebugLevel) } var logSaveDays = 15 logger.SetSaveDays(logSaveDays) var logSaveDays = 15 logger.SetSaveDays(logSaveDays) logger.Info("loginit success !") flag.BoolVar(&useShm, "shm", false, "use shm for performance") } } func main() { flag.Parse() time.Sleep(time.Second) flag.Parse() time.Sleep(time.Second) if useShm { if useShm{ logger.Info("USE SHARE MEMORY") } else { }else{ logger.Info("USE PIPE") } // pprof 用于分析性能 @@ -73,8 +72,8 @@ go util.Init(initchan) logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan) sdk.Init(useShm) // 获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行) tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签 camera.Init(useShm) //获取cid, taskid, sdkid ,关系 sdk.Init(useShm) // 获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行) tasktag.Init() // 获取所有任务,建立任务标签, 在数据进入时, 打标签 camera.Init(useShm) //获取cid, taskid, sdkid ,关系 select {} } sdk/sdk.go
@@ -21,6 +21,25 @@ var shm bool = false var innerRecvTopic = []string{ "facedetect-sdk-no-track", //to sdk-no-track 以图搜图 } var innerSendTopic = []string{ "facedetect-sdk-no-track", //to sdk-no-track 以图搜图 "virtual-faceextract-sdk-pull", //to web 以图搜图 } func initInnerTopic() { // for _, sendTopic := range innerSendTopic { // createSdkSendServerAndListen(sendTopic) // } // for _, recvTopic := range innerRecvTopic { // createSdkRecvServerAndListen(recvTopic) // } } func Init(useShm bool) { shm = useShm @@ -28,7 +47,16 @@ logger.Info("============= init sdk info =====================") for _, sdkid := range util.Sdklist { // 创建sdk server createSdkTopicAndServer(sdkid) logger.Info() } // 手动输入的主题 initInnerTopic() // es SdkMap["es"] = make(chan protomsg.SdkMessage) logger.Info("create es channel: ") go DealEsTopic() go autoUpdateSdk(util.Sdkflag) } @@ -55,6 +83,7 @@ func createSdkSendServerAndListen(id string) { if _, isExist := SdkMap[id]; !isExist { //不存在 SdkMap[id] = make(chan protomsg.SdkMessage) logger.Info("create", id) } url := "ipc:///tmp/" + id + postPush + ".ipc" @@ -64,15 +93,15 @@ url = id + postPush } logger.Info("SDK URL: ", url) socket, err := util.NewSocketListen(int(m), url, shm) if err != nil { delete(SdkMap, id) logger.Error(id, "create socket error!") return } SocketManage[id] = socket logger.Info("SDK URL Send: ", url) go Send(id, socket, SdkMap[id]) } @@ -98,9 +127,7 @@ logger.Error(id, "create socket error!") return } SocketManage[id] = socket logger.Info("SDK URL Recv: ", url) go Recv(socket) } @@ -126,6 +153,14 @@ } newSdkList := util.Sdklist // 手动添加的全部加上 for _, sendTopic := range innerSendTopic { newSdkList = append(newSdkList, sendTopic) } for _, recvTopic := range innerRecvTopic { newSdkList = append(newSdkList, recvTopic) } sdkListUpdate := util.Difference(oldSdkList, newSdkList) logger.Info(sdkListUpdate) @@ -159,13 +194,13 @@ func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid // if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && // "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && // "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype { // //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" { // logger.Info("----------Sdktype:yitusoutu") // logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype) // } if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype && "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype { //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" { logger.Info("----------Sdktype:yitusoutu") logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype) } } else { sendTopic = "es" } @@ -195,7 +230,7 @@ if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") tryCount = 0 continue } @@ -203,7 +238,6 @@ } continue } else { err = proto.Unmarshal(msg, &repsdkmsg) if err != nil { logger.Error("unmarshal error: ", err) @@ -212,8 +246,10 @@ repsdkmsg.Tasklab.Index++ //调用计算函数, 分发给下一个主题 nexttopic := GetSdkSendTopic(repsdkmsg) if "facedetect-sdk-no-track" == nexttopic || "virtual-faceextract-sdk-pull" == nexttopic { logger.Info("nexttopic:", nexttopic) } SdkMap[nexttopic] <- repsdkmsg logger.Info("recv from URL: ", socket.URL, " success: ", len(msg), " send to: ", repsdkmsg) } } } @@ -239,6 +275,9 @@ } if err := socket.Sock.Send(data); err != nil { if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid { logger.Error("failed send:sdkid=", sdkid) } // tryCount++ // socket = util.MaybeRestartSocket(socket, &tryCount) @@ -247,7 +286,7 @@ if tryCount > util.ShmMaxTryCount { socket.Sock.Close() socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM) // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT") tryCount = 0 continue } @@ -256,9 +295,9 @@ continue } logger.Info("send to sdk id: ", sdkid, " success: ", len(data)) if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid { logger.Info(sdkid, " send success: ", len(data)) } } else { logger.Debug(sdkid, " 主题关闭, 关闭send()") return tasktag/tasktag.go
@@ -3,10 +3,9 @@ import ( "sync" "taskpubsub/util" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "taskpubsub/util" ) var TaskLabelMap sync.Map @@ -46,6 +45,7 @@ } updateTaskLabelMap(newtls) TaskLabelMap.Range(func(k, v interface{}) bool { logger.Info(k, v) return true }) } util/sqlite.go
@@ -71,26 +71,22 @@ logger.Info("update camera") CameraIds = camval.FindAll() Cameraflag <- true logger.Info(CameraIds) logger.Info("update camera finish.") case protomsg.TableChanged_T_File: logger.Info("update analysis files") FileArr, _ = fileApi.GetAnalysisFiles() Fileflag <- true logger.Info("analysis files:", FileArr) logger.Info("update files finish.") case protomsg.TableChanged_T_CameraTask: logger.Info("update cameratask") CameraTasks = camval.FindAllCameraAndTask() logger.Info(CameraTasks) logger.Info("update cameratask finished!") case protomsg.TableChanged_T_TaskSdk: logger.Info("update tasksdk") TaskSdks = taskapi.FindAllTaskSdkRun() TaskSdkflag <- true logger.Info(TaskSdks) logger.Info("update tasksdk finished!") case protomsg.TableChanged_T_Sdk: @@ -98,8 +94,6 @@ Sdklist = sdkapi.GetAllSdkIds() Sdkinfos = sdkapi.FindAll("") Sdkflag <- true logger.Info(Sdklist) logger.Info(Sdkinfos) logger.Info("update Sdkinfos finished!") default: util/util.go
@@ -97,6 +97,7 @@ // create server func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) { logger.Info("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx