From a61cd237d45c7bec7a3e0cdf32ed33c49b41b73b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 16 十二月 2019 17:41:12 +0800
Subject: [PATCH] update
---
tasktag/tasktag.go | 4
main.go | 41 ++++++-------
sdk/sdk.go | 75 +++++++++++++++++++------
util/sqlite.go | 6 --
util/util.go | 1
camera/camera.go | 18 +++--
6 files changed, 91 insertions(+), 54 deletions(-)
diff --git a/camera/camera.go b/camera/camera.go
index 0901ad4..ee20b9e 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -182,7 +182,7 @@
if tryCount > util.ShmMaxTryCount {
socket.Sock.Close()
socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
- // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT")
+ logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT")
tryCount = 0
continue
}
@@ -207,7 +207,7 @@
} else {
taskIDs := GetAllTaskByID(imagemsg.Cid)
for _, taskID := range taskIDs {
- // logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID)
+ logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID)
doTaskList(imagemsg.Cid, "", taskID, recvmessage)
}
}
@@ -244,13 +244,17 @@
// 璁$畻鍒嗗彂鐨勪富棰�
SendTopic := sdk.GetSdkSendTopic(sdkmsg)
//logger.Info(SendTopic)
-
+ if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic {
+ logger.Info(SendTopic)
+ }
if _, ok := sdk.SdkMap[SendTopic]; ok {
-
- logger.Debug("recv from camera id: ", cid, " len: ", len(data), " send to sdk id: ", SendTopic)
-
+ if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic {
+ logger.Info(SendTopic)
+ }
sdk.SdkMap[SendTopic] <- sdkmsg
-
+ if "facedetect-sdk-no-track" == SendTopic || "virtual-faceextract-sdk-pull" == SendTopic {
+ logger.Info("dispute sendtopic success", SendTopic)
+ }
} else {
logger.Info("鍒嗗彂鐨勪富棰樹笉瀛樺湪")
}
diff --git a/main.go b/main.go
index 535173e..263c91a 100644
--- a/main.go
+++ b/main.go
@@ -1,15 +1,14 @@
package main
import (
- "flag"
"net/http"
+ "flag"
_ "net/http/pprof"
+ "time"
"taskpubsub/camera"
"taskpubsub/sdk"
"taskpubsub/tasktag"
"taskpubsub/util"
- "time"
-
"basic.com/valib/logger.git"
"github.com/spf13/viper"
)
@@ -18,14 +17,14 @@
var useShm bool
const (
- configFilePath = "/opt/vasystem/config/"
+ configFilePath = "/opt/vasystem/config/"
configFileType = "yaml"
- LOGBASEPATH = "/data/disk1/valog/taskpubsub.log"
+ LOGBASEPATH = "/data/disk1/valog/taskpubsub.log"
)
var envirment = flag.String("e", "pro", "")
-func init() {
+func init(){
viper.SetConfigType(configFileType)
viper.SetConfigName(*envirment)
viper.AddConfigPath(configFilePath)
@@ -36,33 +35,33 @@
//panic(err)
}
- var logFile = LOGBASEPATH
+ var logFile = LOGBASEPATH
if viper.GetString("LogBasePath") != "" {
logFile = viper.GetString("LogBasePath") + "/taskpubsub.log"
}
- // 鏃ュ織鍒濆鍖�
+ // 鏃ュ織鍒濆鍖�
if viper.IsSet("LogLevel") &&
viper.GetInt("LogLevel") >= logger.PanicLevel &&
viper.GetInt("LogLevel") <= logger.DebugLevel {
logger.Config(logFile, viper.GetInt("LogLevel"))
- } else {
+ }else{
logger.Config(logFile, logger.DebugLevel)
}
- var logSaveDays = 15
- logger.SetSaveDays(logSaveDays)
+ var logSaveDays = 15
+ logger.SetSaveDays(logSaveDays)
logger.Info("loginit success !")
-
+
flag.BoolVar(&useShm, "shm", false, "use shm for performance")
-}
-
+}
+
func main() {
- flag.Parse()
- time.Sleep(time.Second)
+ flag.Parse()
+ time.Sleep(time.Second)
- if useShm {
+ if useShm{
logger.Info("USE SHARE MEMORY")
- } else {
+ }else{
logger.Info("USE PIPE")
}
// pprof 鐢ㄤ簬鍒嗘瀽鎬ц兘
@@ -73,8 +72,8 @@
go util.Init(initchan)
logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
- sdk.Init(useShm) // 鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
- tasktag.Init() // 鑾峰彇鎵�鏈変换鍔★紝寤虹珛浠诲姟鏍囩锛� 鍦ㄦ暟鎹繘鍏ユ椂锛� 鎵撴爣绛�
- camera.Init(useShm) //鑾峰彇cid, taskid, sdkid ,鍏崇郴
+ sdk.Init(useShm) // 鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
+ tasktag.Init() // 鑾峰彇鎵�鏈変换鍔★紝寤虹珛浠诲姟鏍囩锛� 鍦ㄦ暟鎹繘鍏ユ椂锛� 鎵撴爣绛�
+ camera.Init(useShm) //鑾峰彇cid, taskid, sdkid ,鍏崇郴
select {}
}
diff --git a/sdk/sdk.go b/sdk/sdk.go
index 0941757..88c0445 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -21,6 +21,25 @@
var shm bool = false
+var innerRecvTopic = []string{
+ "facedetect-sdk-no-track", //to sdk-no-track 浠ュ浘鎼滃浘
+}
+
+var innerSendTopic = []string{
+ "facedetect-sdk-no-track", //to sdk-no-track 浠ュ浘鎼滃浘
+ "virtual-faceextract-sdk-pull", //to web 浠ュ浘鎼滃浘
+}
+
+func initInnerTopic() {
+ // for _, sendTopic := range innerSendTopic {
+ // createSdkSendServerAndListen(sendTopic)
+ // }
+
+ // for _, recvTopic := range innerRecvTopic {
+ // createSdkRecvServerAndListen(recvTopic)
+ // }
+}
+
func Init(useShm bool) {
shm = useShm
@@ -28,7 +47,16 @@
logger.Info("============= init sdk info =====================")
for _, sdkid := range util.Sdklist { // 鍒涘缓sdk server
createSdkTopicAndServer(sdkid)
+ logger.Info()
}
+
+ // 鎵嬪姩杈撳叆鐨勪富棰�
+ initInnerTopic()
+
+ // es
+ SdkMap["es"] = make(chan protomsg.SdkMessage)
+ logger.Info("create es channel: ")
+ go DealEsTopic()
go autoUpdateSdk(util.Sdkflag)
}
@@ -55,6 +83,7 @@
func createSdkSendServerAndListen(id string) {
if _, isExist := SdkMap[id]; !isExist { //涓嶅瓨鍦�
SdkMap[id] = make(chan protomsg.SdkMessage)
+ logger.Info("create", id)
}
url := "ipc:///tmp/" + id + postPush + ".ipc"
@@ -64,15 +93,15 @@
url = id + postPush
}
+ logger.Info("SDK URL: ", url)
+
socket, err := util.NewSocketListen(int(m), url, shm)
if err != nil {
delete(SdkMap, id)
logger.Error(id, "create socket error!")
return
}
-
SocketManage[id] = socket
- logger.Info("SDK URL Send: ", url)
go Send(id, socket, SdkMap[id])
}
@@ -98,9 +127,7 @@
logger.Error(id, "create socket error!")
return
}
-
SocketManage[id] = socket
- logger.Info("SDK URL Recv: ", url)
go Recv(socket)
}
@@ -126,6 +153,14 @@
}
newSdkList := util.Sdklist
+
+ // 鎵嬪姩娣诲姞鐨勫叏閮ㄥ姞涓�
+ for _, sendTopic := range innerSendTopic {
+ newSdkList = append(newSdkList, sendTopic)
+ }
+ for _, recvTopic := range innerRecvTopic {
+ newSdkList = append(newSdkList, recvTopic)
+ }
sdkListUpdate := util.Difference(oldSdkList, newSdkList)
logger.Info(sdkListUpdate)
@@ -159,13 +194,13 @@
func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) {
if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid
- // if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype &&
- // "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype &&
- // "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype {
- // //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" {
- // logger.Info("----------Sdktype:yitusoutu")
- // logger.Info("鍒嗗彂鐨勪富棰�:", sendTopic, "!Sdktype锛�", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype)
- // }
+ if "Yolo" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype &&
+ "FaceDetect" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype &&
+ "FaceCompare" != sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype {
+ //if sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype == "" {
+ logger.Info("----------Sdktype:yitusoutu")
+ logger.Info("鍒嗗彂鐨勪富棰�:", sendTopic, "!Sdktype锛�", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype)
+ }
} else {
sendTopic = "es"
}
@@ -195,7 +230,7 @@
if tryCount > util.ShmMaxTryCount {
socket.Sock.Close()
socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
- // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT")
+ logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT")
tryCount = 0
continue
}
@@ -203,7 +238,6 @@
}
continue
} else {
-
err = proto.Unmarshal(msg, &repsdkmsg)
if err != nil {
logger.Error("unmarshal error: ", err)
@@ -212,8 +246,10 @@
repsdkmsg.Tasklab.Index++
//璋冪敤璁$畻鍑芥暟锛� 鍒嗗彂缁欎笅涓�涓富棰�
nexttopic := GetSdkSendTopic(repsdkmsg)
+ if "facedetect-sdk-no-track" == nexttopic || "virtual-faceextract-sdk-pull" == nexttopic {
+ logger.Info("nexttopic:", nexttopic)
+ }
SdkMap[nexttopic] <- repsdkmsg
- logger.Info("recv from URL: ", socket.URL, " success: ", len(msg), " send to: ", repsdkmsg)
}
}
}
@@ -239,6 +275,9 @@
}
if err := socket.Sock.Send(data); err != nil {
+ if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid {
+ logger.Error("failed send:sdkid=", sdkid)
+ }
// tryCount++
// socket = util.MaybeRestartSocket(socket, &tryCount)
@@ -247,7 +286,7 @@
if tryCount > util.ShmMaxTryCount {
socket.Sock.Close()
socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
- // logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT")
+ logger.Info("SDK SEND SHM TRY :", util.ShmMaxTryCount, " RESTART IT")
tryCount = 0
continue
}
@@ -256,9 +295,9 @@
continue
}
-
- logger.Info("send to sdk id: ", sdkid, " success: ", len(data))
-
+ if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid {
+ logger.Info(sdkid, " send success: ", len(data))
+ }
} else {
logger.Debug(sdkid, " 涓婚鍏抽棴, 鍏抽棴send()")
return
diff --git a/tasktag/tasktag.go b/tasktag/tasktag.go
index 75c20d1..dca8b78 100644
--- a/tasktag/tasktag.go
+++ b/tasktag/tasktag.go
@@ -3,10 +3,9 @@
import (
"sync"
- "taskpubsub/util"
-
"basic.com/pubsub/protomsg.git"
"basic.com/valib/logger.git"
+ "taskpubsub/util"
)
var TaskLabelMap sync.Map
@@ -46,6 +45,7 @@
}
updateTaskLabelMap(newtls)
TaskLabelMap.Range(func(k, v interface{}) bool {
+ logger.Info(k, v)
return true
})
}
diff --git a/util/sqlite.go b/util/sqlite.go
index 7deb0b4..3ee87ca 100644
--- a/util/sqlite.go
+++ b/util/sqlite.go
@@ -71,26 +71,22 @@
logger.Info("update camera")
CameraIds = camval.FindAll()
Cameraflag <- true
- logger.Info(CameraIds)
logger.Info("update camera finish.")
case protomsg.TableChanged_T_File:
logger.Info("update analysis files")
FileArr, _ = fileApi.GetAnalysisFiles()
Fileflag <- true
- logger.Info("analysis files:", FileArr)
logger.Info("update files finish.")
case protomsg.TableChanged_T_CameraTask:
logger.Info("update cameratask")
CameraTasks = camval.FindAllCameraAndTask()
- logger.Info(CameraTasks)
logger.Info("update cameratask finished!")
case protomsg.TableChanged_T_TaskSdk:
logger.Info("update tasksdk")
TaskSdks = taskapi.FindAllTaskSdkRun()
TaskSdkflag <- true
- logger.Info(TaskSdks)
logger.Info("update tasksdk finished!")
case protomsg.TableChanged_T_Sdk:
@@ -98,8 +94,6 @@
Sdklist = sdkapi.GetAllSdkIds()
Sdkinfos = sdkapi.FindAll("")
Sdkflag <- true
- logger.Info(Sdklist)
- logger.Info(Sdkinfos)
logger.Info("update Sdkinfos finished!")
default:
diff --git a/util/util.go b/util/util.go
index 88a52ba..e3b8197 100644
--- a/util/util.go
+++ b/util/util.go
@@ -97,6 +97,7 @@
// create server
func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) {
+ logger.Info("url is: ", url)
ctx, cancel := context.WithCancel(context.Background())
socket.Context = ctx
--
Gitblit v1.8.0