From 94a88b93081043439b20db4adf33721b1e0d6f07 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 20 十二月 2019 16:46:53 +0800 Subject: [PATCH] support hotupdate --- /dev/null | 125 ------------------------- .gitignore | 1 modulemake.sh | 6 go.mod | 29 ----- main.go | 103 ++++++++------------ 5 files changed, 47 insertions(+), 217 deletions(-) diff --git a/.gitignore b/.gitignore index c95911e..d761c0c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ *.jpg data/ runtime/ +libs/ diff --git a/go.mod b/go.mod index d808405..9cca7cb 100644 --- a/go.mod +++ b/go.mod @@ -3,33 +3,10 @@ go 1.12 require ( - basic.com/dbapi.git v0.0.0-20190803074805-04f05c8ca762 - basic.com/libgowrapper/sdkstruct.git v0.0.0-20191211011351-89daaec8738e - basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 - basic.com/pubsub/protomsg.git v0.0.0-20191213031824-8f12ebe28c26 - basic.com/ruleForSdk.git v0.0.0-20190808095604-936797ef5da3 - basic.com/valib/deliver.git v0.0.0-20190830083657-47adbbbb6651 - basic.com/valib/godraw.git v0.0.0-20191122082247-26e9987cd183 + basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865 - basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 - basic.com/valib/gosdk.git v0.0.0-20191127014622-8e01cb7623bf - basic.com/valib/shm.git v0.0.0-20190829074754-ad2e00879627 // indirect - github.com/ajg/form v1.5.1 // indirect github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717 - github.com/disintegration/imaging v1.6.2 - github.com/fatih/color v1.7.0 // indirect - github.com/gogo/protobuf v1.2.1 - github.com/golang/snappy v0.0.1 - github.com/knetic/govaluate v3.0.0+incompatible // indirect - github.com/llgcode/draw2d v0.0.0-20190810100245-79e59b6b8fbc // indirect - github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e // indirect - github.com/mattn/go-colorable v0.1.2 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible - github.com/phachon/go-logger v0.0.0-20180912060440-89ff8a2898f6 - github.com/pierrec/lz4 v2.0.5+incompatible - github.com/spf13/viper v1.4.0 - github.com/yireyun/go-queue v0.0.0-20180809062148-5e6897360dac - golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect - golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed - nanomsg.org/go-mangos v1.4.0 // indirect + github.com/spf13/viper v1.6.1 + golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a ) diff --git a/main.go b/main.go index 0c7c0fa..8702d5e 100644 --- a/main.go +++ b/main.go @@ -2,17 +2,16 @@ import ( "analysis/logo" - "analysis/proc" "analysis/util" "strconv" + + "analysis/app/master" + "analysis/app/slave" "context" "flag" "os" - "os/signal" - - "golang.org/x/sys/unix" "basic.com/valib/gogpu.git" // "net/http" @@ -27,41 +26,26 @@ gpu int shm bool ruleIPC string - soFile string + + configPath string ) const ( - master = `master` - slave = `slave` - single = `single` + roleMaster = `master` + roleSlave = `slave` ) func init() { - flag.BoolVar(&logit, "logit", false, "use logger,default flase") - flag.StringVar(&role, "role", single, "run process role master/slave/single, default single") + flag.StringVar(&role, "role", roleMaster, "run process role master/slave, default single") flag.StringVar(&runType, "sdk", util.FeatAll, "run sdk type, face/facedetect/facecompare/yolo,etc.") flag.StringVar(&id, "id", util.FakeSdkID, "sdk id as ipc label") - flag.IntVar(&gpu, "gpu", -1, "gpu index") - flag.BoolVar(&shm, "shm", false, "use shm for performance") - flag.StringVar(&soFile, "so-file", "./", "libcffmpeg.so path") + flag.StringVar(&configPath, util.ConfigPath, "", "sdk config file path") - // 浜鸿劯璇嗗埆鍙傛暟 - flag.IntVar(&util.SFI.DetectThrd, util.FaceDetectThread, util.SFI.DetectThrd, "face detect max thread count") - flag.IntVar(&util.SFI.DetectNum, util.FaceDetectNum, util.SFI.DetectNum, "face detect min face count") - flag.IntVar(&util.SFI.DetectAngle, util.FaceDetectAngle, util.SFI.DetectAngle, "face detect face angle") - flag.IntVar(&util.SFI.PropThrd, util.FacePropertyThread, util.SFI.PropThrd, "face property max thread count") - flag.IntVar(&util.SFI.ExtractThrd, util.FaceExtractThread, util.SFI.ExtractThrd, "face extract max thread count") - flag.IntVar(&util.SFI.TrackInterval, util.FaceTrackInterval, util.SFI.TrackInterval, "face track needed interval") - flag.IntVar(&util.SFI.SampleSize, util.FaceTrackSample, util.SFI.SampleSize, "face track sample size") + flag.BoolVar(&logit, util.LogIt, false, "use logger,default flase") + flag.BoolVar(&shm, util.SHM, false, "use shm for performance") - // 杞﹁締淇℃伅 - // 鎺堟潈鏈嶅姟璺緞 - flag.StringVar(&util.SCI.LicenseServerPath, util.CarSDKLicense, util.SCI.LicenseServerPath, "car detect lic server bin path") - flag.IntVar(&util.SCI.MaxImageWidth, util.CarMaxImageWidth, util.SCI.MaxImageWidth, "car detect image width") - flag.IntVar(&util.SCI.MaxImageHeight, util.CarMaxImageHeight, util.SCI.MaxImageHeight, "car detect image height") - // cloud plate - flag.StringVar(&util.SCI.Model, util.CarSDKModel, util.SCI.Model, "cloud palte detector model") + flag.IntVar(&gpu, util.GPU, -1, "gpu index") // 鎸囧畾鑾峰彇閰嶇疆淇℃伅浠巗qlite,鏈夋渶楂樹紭鍏堢骇, master浣跨敤 flag.StringVar(&util.FSI.IP, util.FetchSrvIP, util.FSI.IP, "fetch server ip, like camera info") @@ -86,19 +70,6 @@ } func setParamters() { - util.FillParams(util.FaceDetectThread, strconv.Itoa(util.SFI.DetectThrd)) - util.FillParams(util.FaceDetectNum, strconv.Itoa(util.SFI.DetectNum)) - util.FillParams(util.FaceDetectAngle, strconv.Itoa(util.SFI.DetectAngle)) - util.FillParams(util.FacePropertyThread, strconv.Itoa(util.SFI.PropThrd)) - util.FillParams(util.FaceExtractThread, strconv.Itoa(util.SFI.ExtractThrd)) - util.FillParams(util.FaceTrackInterval, strconv.Itoa(util.SFI.TrackInterval)) - util.FillParams(util.FaceTrackSample, strconv.Itoa(util.SFI.SampleSize)) - - util.FillParams(util.CarSDKLicense, util.SCI.LicenseServerPath) - util.FillParams(util.CarSDKModel, util.SCI.Model) - util.FillParams(util.CarMaxImageWidth, strconv.Itoa(util.SCI.MaxImageWidth)) - util.FillParams(util.CarMaxImageHeight, strconv.Itoa(util.SCI.MaxImageHeight)) - util.FillParams(util.FetchSrvIP, util.FSI.IP) util.FillParams(util.FetchSrvPort, strconv.Itoa(util.FSI.HTTPort)) util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort)) @@ -106,23 +77,13 @@ util.FillParams(util.RuleIPC, util.ToRuleIPC) - util.SetParams("-so-file=" + soFile) - util.SetParams("-gpu=" + strconv.Itoa(gpu)) + util.FillParams(util.GPU, strconv.Itoa(gpu)) if logit { - util.SetParams("-logit") + util.FillParams(util.LogIt, "true") } if shm { - util.SetParams("-shm") + util.FillParams(util.SHM, "true") } - - // util.FillParams("gpu", strconv.Itoa(gpu)) - // if logit { - // util.FillParams("logit", "true") - // } - // if shm { - // util.FillParams("shm", "true") - // } - } func main() { @@ -143,26 +104,42 @@ runLogger() + if configPath == "" || len(configPath) == 0 { + logo.Infoln("!!!!!! SDK CONFIG PATH MUST EXIST !!!!!!") + return + } logo.Infoln(os.Args) ctx, cancel := context.WithCancel(context.Background()) ret := false - if role == single { - ret = proc.SingleRole(ctx, runType, id, gpu, shm) - } else if role == master { + if role == roleMaster { setParamters() - ret = proc.MasterRole(ctx) - } else if role == slave { - ret = proc.SingleRole(ctx, runType, id, gpu, shm) + ret = master.Run(ctx, configPath) + + } else if role == roleSlave { + ret = slave.Run(ctx, configPath, runType, id, gpu, shm) } if ret { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c + // c := make(chan os.Signal, 1) + // signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) + // <-c + select {} } cancel() logo.Close() } + +// const ( +// configFilePath = "/opt/vasystem/config/" +// configFileName = "sdkconfig" +// configFileType = "yaml" +// ) +// viper.SetConfigType(configFileType) +// viper.SetConfigName(configFileName) +// viper.AddConfigPath(configFilePath) +// viper.AddConfigPath("./") + +// envString := "" diff --git a/modulemake.sh b/modulemake.sh index 58e0470..b5138f9 100755 --- a/modulemake.sh +++ b/modulemake.sh @@ -9,12 +9,12 @@ cur_dir=`pwd` src=$cur_dir"/libgowrapper" -runtime_dir=$cur_dir"/runtime" +runtime_dir=$cur_dir"/libs" # runtime_dir not exist, then create if [ ! -d $runtime_dir ];then mkdir $runtime_dir - echo "~~~CREATE RUNTIME LIBRARY DIRECTORY" + echo "~~~CREATE RUNTIME LIBRARY DIRECTORY $runtime_dir" fi cd $src @@ -36,7 +36,7 @@ fi # move to runtime_dir - cp -fr lib$value.so $runtime_dir + mv -f lib$value.so $runtime_dir # move sdk to runtime_dir if [ -d `pwd`/sdk/lib ]; then cp -fr `pwd`/sdk/lib/* $runtime_dir diff --git a/modulepull.sh b/modulepull.sh deleted file mode 100755 index 750f2fe..0000000 --- a/modulepull.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/sh - -cur=`pwd` - -for v in `ls`; do - if [ ! -d $cur"/"$v ]; then - continue - fi - cd $v - git pull - cd .. -done diff --git a/modulepush.sh b/modulepush.sh deleted file mode 100755 index ba2b340..0000000 --- a/modulepush.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/sh - -cur=`pwd` - -for v in `ls`; do - if [ ! -d $cur"/"$v ]; then - continue - fi - cd $v - git add . - git commit -m 'update' - git push - cd .. -done diff --git a/proc/gpu.go b/proc/gpu.go deleted file mode 100644 index 2e192e0..0000000 --- a/proc/gpu.go +++ /dev/null @@ -1,56 +0,0 @@ -package proc - -import ( - "analysis/logo" - - "basic.com/valib/gogpu.git" -) - -func freeGPU(id, needed, reserve int) int { - indices := gogpu.RankGPU() - if len(indices) == 0 { - logo.Errorln("THERE IS NO GPU FOR VALID") - return -2 - } - - gi := id - - if id == -1 { - if !gogpu.SatisfyGPU(indices[0], needed, reserve) { - logo.Errorf("CAN'T SPARE GPU MEMORY NEEDED : %dM, RESERVER: %dM\n", needed, reserve) - gi = -1 - } - gi = indices[0] - } else { - if !gogpu.SatisfyGPU(gi, needed, reserve) { - logo.Errorf("2ND CAN'T SPARE GPU MEMORY NEEDED : %dM, RESERVER: %dM\n", needed, reserve) - gi = indices[0] - if !gogpu.SatisfyGPU(gi, needed, reserve) { - logo.Errorf("3RD CAN'T SPARE GPU MEMORY NEEDED : %dM, RESERVER: %dM\n", needed, reserve) - gi = -1 - } - } - } - return gi -} - -// 浼樺厛涓嶄娇鐢ㄧ1蹇樉鍗� -func priorGPU(lastChoice, needed, reserve int) int { - indices := gogpu.RankGPU() - if len(indices) == 0 { - return -1 - } - - for _, v := range indices { - if v != lastChoice { - if gogpu.SatisfyGPU(v, needed, reserve) { - return v - } - } - } - - if gogpu.SatisfyGPU(lastChoice, needed, reserve) { - return lastChoice - } - return -1 -} diff --git a/proc/master.go b/proc/master.go deleted file mode 100644 index eec37c2..0000000 --- a/proc/master.go +++ /dev/null @@ -1,59 +0,0 @@ -package proc - -import ( - "analysis/logo" - "analysis/util" - "context" - - "github.com/spf13/viper" -) - -func reaper(ctxt context.Context) { - pidChan := make(chan int, 1) - Reap(pidChan) - go waitForRestart(ctxt, pidChan) -} - -// MasterRole master -func MasterRole(ctx context.Context) bool { - reaper(ctx) - - util.InitDBAPI() - - const ( - configFilePath = "/opt/vasystem/config/" - configFileName = "sdkconfig" - configFileType = "yaml" - ) - viper.SetConfigType(configFileType) - viper.SetConfigName(configFileName) - viper.AddConfigPath(configFilePath) - viper.AddConfigPath("./") - - envString := "" - - sdks := util.SDKInfo() - for k, v := range sdks { - - if err := viper.ReadInConfig(); err == nil { - - envString = viper.GetString(v.SdkType) - - } - logo.Errorln("MASTER ANALYSIS START SLAVE TYPE: ", v.SdkType, " SDK CONFIG: ", envString) - - args := []string{ - `-role=slave`, - "-sdk=" + v.SdkType, - "-id=" + v.IpcId, - } - args = append(args, (*util.GetParams())...) - pid, err := runProc(ctx, "./analysis", args, &envString) - - if err != nil { - logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcId, err) - } - logo.Infof("START %d PROC %d SDK %s ID %s\n", k, pid, v.IpcId, v.SdkType) - } - return true -} diff --git a/proc/proc.go b/proc/proc.go deleted file mode 100644 index 7cbaa1f..0000000 --- a/proc/proc.go +++ /dev/null @@ -1,202 +0,0 @@ -package proc - -import ( - "analysis/logo" - "analysis/util" - "analysis/work" - "analysis/work/sdk" - "context" - "os" -) - -type infoSDK struct { - name string - ipc string - s sdk.Engine - shm bool -} - -var ( - mapFunc = make(map[string]sdk.Engine) - infos []infoSDK - fn func([]byte, bool) -) - -const ( - postPull = `_1` - postPush = `_2` - - reservedGMem = 512 - yoloGMem = 2048 - faceGMem = 1024 - faceMustGPU0 = 0 - - cfg = "./data/yolo/cfg/yolov3.cfg" - weights = "./data/yolo/yolov3.weights" - name = "./data/yolo/data/coco.names" -) - -const ( - FDetect = "FaceDetect" - // FExtract = "FaceExtract" - // FProperty = "FaceProperty" - FCompare = "FaceCompare" - FtTract = "FaceTrack" - FtDetect = "FaceTrackDetect" - FtOnly = "FaceTrackOnly" - YDetect = "Yolo" - PlateID = "Plate" - HumanTrack = "HumanTrack" -) - -var ( - SDK = []string{ - YDetect, - FDetect, - FCompare, - PlateID, - HumanTrack, - } -) - -func prepare(name string, gpu int) (string, bool) { - - return initSDK(name, gpu) -} - -func initSDK(name string, gpu int) (string, bool) { - - sdkType := name - - if s, f := util.FindStringInArray(sdkType, SDK); f { - sdkType = s - } else { - logo.Errorln("THERE IS NO THIS SDK TYPE: ", sdkType) - return sdkType, false - } - - thrds := util.SFI.DetectThrd - if thrds < util.SFI.PropThrd { - thrds = util.SFI.PropThrd - } - if thrds < util.SFI.ExtractThrd { - thrds = util.SFI.ExtractThrd - } - - logo.Infoln("PASS GPU INDEX: ", gpu) - - var e sdk.Engine - ret := true - - if sdkType == FDetect { - e, ret = createFaceDetect(gpu, thrds, util.SFI.DetectNum, util.SFI.DetectAngle, util.SFI.TrackInterval, util.SFI.SampleSize) - - } else if sdkType == YDetect { - e, ret = createYolo(gpu) - - } else if sdkType == FtTract { - e, ret = createFaceTrack(gpu, thrds, util.SFI.DetectNum, util.SFI.DetectAngle, util.SFI.TrackInterval, util.SFI.SampleSize) - - } else if sdkType == FCompare { - e = sdk.NewEFExtract(100) - } else if sdkType == PlateID { - e = sdk.NewVehicleDetector(util.SCI.LicenseServerPath, util.SCI.Model, util.SCI.MaxImageWidth, util.SCI.MaxImageHeight) - } else if sdkType == HumanTrack { - e = sdk.NewHumanTracker(gpu, 1, 0) - } - - if ret { - mapFunc[sdkType] = e - } else { - logo.Errorln("CREATE SDK : ", sdkType, " FAILED") - os.Exit(0) - } - return sdkType, ret -} - -func build(fname, id string, shm bool) { - f, ok := mapFunc[fname] - if !ok { - logo.Errorln("sdk has no this func : ", fname) - return - } - - if flag := f.Init(); !flag { - logo.Errorln("create sdk : ", fname, " error") - return - } - - s := infoSDK{fname, id, f, shm} - infos = append(infos, s) -} - -func runAll(ctx context.Context) { - - rule := work.NewToRule(30) - url := util.ToRuleIPC - go rule.Run(ctx, url) - - fn = func(data []byte, valid bool) { - if rule != nil { - rule.Push(data, valid) - } - } - - for _, v := range infos { - go run(ctx, v.s, v.name, v.ipc, v.shm) - } -} - -func run(ctx context.Context, s sdk.Engine, fname, id string, shm bool) { - chRcv := make(chan work.MsgRS) - chSnd := make(chan work.MsgRS) - - ipcRcv := util.GetIpcAddress(shm, id+postPull) - recv := work.NewReciever(ipcRcv, chRcv, shm) - logo.Infoln("RECV IPC: ", ipcRcv) - go recv.Run(ctx) - - ipcSnd := util.GetIpcAddress(shm, id+postPush) - snd := work.NewSender(ipcSnd, chSnd, shm) - snd.ApplyCallbackFunc(fn) - logo.Infoln("SND IPC: ", ipcSnd) - go snd.Run(ctx) - - s.Run(ctx, chRcv, chSnd, fname) -} - -//////////////////////////////////////////////////////////////// -////////////// test -//////////////////////////////////////////////////////////////// -// single run -func runSDK(ctx context.Context, fname, id string, shm bool) { - - f, ok := mapFunc[fname] - if !ok { - logo.Errorln("sdk has no this func : ", fname) - return - } - - if flag := f.Init(); !flag { - logo.Errorln("create sdk : ", fname, " error") - return - } - - chRcv := make(chan work.MsgRS) - chSnd := make(chan work.MsgRS) - - ipcS := `ipc:///tmp/` - - ipcRcv := ipcS + id + postPull - recv := work.NewReciever(ipcRcv, chRcv, shm) - logo.Infoln("\nrecv ipc: ", ipcRcv) - go recv.Run(ctx) - - ipcSnd := ipcS + id + postPush - snd := work.NewSender(ipcSnd, chSnd, shm) - logo.Infoln("\nsnd ipc: ", ipcSnd) - go snd.Run(ctx) - - f.Run(ctx, chRcv, chSnd, fname) - -} diff --git a/proc/reaper.go b/proc/reaper.go deleted file mode 100644 index 5a48028..0000000 --- a/proc/reaper.go +++ /dev/null @@ -1,195 +0,0 @@ -package proc - -import ( - "analysis/logo" - "context" - "os" - "os/exec" - "os/signal" - "syscall" - "time" -) - -type procInfo struct { - cmd *exec.Cmd - env string -} - -var ( - procMap = make(map[int]*procInfo) -) - -func restartProc(ctxt context.Context, pid int) { - info, ok := procMap[pid] - if ok { - err := info.cmd.Wait() - - if err != nil { - logo.Errorln("pid : [", pid, "] quit error: ", err) - } else { - logo.Infoln("pid : [", pid, "] quit") - } - delete(procMap, pid) - runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], &info.env) - } else { - logo.Errorln(pid, " doesn't exist") - } -} - -func quitProc(pid int) { - info, ok := procMap[pid] - if ok { - delete(procMap, pid) - - syscall.Kill(pid, syscall.SIGINT) - info.cmd.Wait() - } else { - logo.Errorln(pid, " doesn't exist") - } -} -func runProc(ctxt context.Context, bin string, args []string, env *string) (int, error) { - cmd := exec.CommandContext(ctxt, bin, args...) - cmd.Env = os.Environ() - cmd.Env = append(cmd.Env, *env) - - pid := -1 - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err := cmd.Start() - if err == nil { - pid = cmd.Process.Pid - procMap[pid] = &procInfo{cmd, *env} - } - return pid, err -} - -type Config struct { - Pid int - Options int - DisablePid1Check bool -} - -// Handle death of child (SIGCHLD) messages. Pushes the signal onto the -// notifications channel if there is a waiter. -func sigChildHandler(notifications chan os.Signal) { - sigs := make(chan os.Signal, 3) - signal.Notify(sigs, syscall.SIGCHLD) - - for { - sig := <-sigs - select { - case notifications <- sig: /* published it. */ - default: - /* - * Notifications channel full - drop it to the - * floor. This ensures we don't fill up the SIGCHLD - * queue. The reaper just waits for any child - * process (pid=-1), so we ain't loosing it!! ;^) - */ - } - } - -} /* End of function sigChildHandler. */ - -// Be a good parent - clean up behind the children. -func reapChildren(config Config, pidChan chan<- int) { - notifications := make(chan os.Signal, 1) - - go sigChildHandler(notifications) - - pid := config.Pid - opts := config.Options - - for { - sig := <-notifications - logo.Infof(" - Received signal %v\n", sig) - for { - var wstatus syscall.WaitStatus - - /* - * Reap 'em, so that zombies don't accumulate. - * Plants vs. Zombies!! - */ - pid, err := syscall.Wait4(pid, &wstatus, opts, nil) - pidChan <- pid - for syscall.EINTR == err { - pid, err = syscall.Wait4(pid, &wstatus, opts, nil) - pidChan <- pid - } - - if syscall.ECHILD == err { - break - } - - logo.Infof(" - Grim reaper cleanup: pid=%d, wstatus=%+v\n", - pid, wstatus) - - } - } - -} /* End of function reapChildren. */ - -/* - * ====================================================================== - * Section: Exported functions - * ====================================================================== - */ - -// Normal entry point for the reaper code. Start reaping children in the -// background inside a goroutine. -func Reap(pidChan chan<- int) { - /* - * Only reap processes if we are taking over init's duties aka - * we are running as pid 1 inside a docker container. The default - * is to reap all processes. - */ - Start(Config{ - Pid: -1, - Options: 0, - DisablePid1Check: true, - }, pidChan) - -} /* End of [exported] function Reap. */ - -// Entry point for invoking the reaper code with a specific configuration. -// The config allows you to bypass the pid 1 checks, so handle with care. -// The child processes are reaped in the background inside a goroutine. -func Start(config Config, pidChan chan<- int) { - /* - * Start the Reaper with configuration options. This allows you to - * reap processes even if the current pid isn't running as pid 1. - * So ... use with caution!! - * - * In most cases, you are better off just using Reap() as that - * checks if we are running as Pid 1. - */ - if !config.DisablePid1Check { - mypid := os.Getpid() - if 1 != mypid { - logo.Errorln(" - Grim reaper disabled, pid not 1\n") - return - } - } - - /* - * Ok, so either pid 1 checks are disabled or we are the grandma - * of 'em all, either way we get to play the grim reaper. - * You will be missed, Terry Pratchett!! RIP - */ - go reapChildren(config, pidChan) - -} /* End of [exported] function Start. */ - -func waitForRestart(ctxt context.Context, pidChan <-chan int) { - - for { - select { - case <-ctxt.Done(): - return - case pid := <-pidChan: - restartProc(ctxt, pid) - default: - time.Sleep(3 * time.Second) - } - } -} diff --git a/proc/sdk.go b/proc/sdk.go deleted file mode 100644 index cf6aa42..0000000 --- a/proc/sdk.go +++ /dev/null @@ -1,49 +0,0 @@ -package proc - -import ( - "analysis/logo" - "analysis/util" - "analysis/work/sdk" - "os" - - "basic.com/valib/gogpu.git" -) - -// gpu淇℃伅,渚濇:妫�娴媡hread,鏁伴噺,瑙掑害,property thread, extract thread, track interval, 鍥剧墖灏哄 -func createFaceDetect(gpuIndex, threads, faceNum, faceAngle, trackInterval, sampleSize int) (sdk.Engine, bool) { - if !gogpu.SatisfyGPU(faceMustGPU0, faceGMem, reservedGMem) { - logo.Errorln("CAN'T USE FACE NEEDED GPU 0 FOR : FaceDetect No GPU Memory") - os.Exit(130) - return nil, false - } - - thrds := threads - if thrds > sdk.MaxFaceDetectThreadNum { - thrds = sdk.MaxFaceDetectThreadNum - } - return sdk.NewEFDetectWithTrack(faceMustGPU0, thrds, faceNum, faceAngle, trackInterval, sampleSize), true -} - -func createFaceTrack(gpuIndex, threads, faceNum, faceAngle, trackInterval, sampleSize int) (sdk.Engine, bool) { - return createFaceDetect(gpuIndex, threads, faceNum, faceAngle, trackInterval, sampleSize) -} - -func createYolo(gpuIndex int) (sdk.Engine, bool) { - - if !util.IsFileExist(cfg) || !util.IsFileExist(weights) || !util.IsFileExist(name) { - logo.Errorln("YOLO CREATE NEED WEIGHTS NAME CFG FILE, BUT NOT FOUND") - return nil, false - } - gi := gpuIndex - if gi == -1 { - gi = priorGPU(faceMustGPU0, yoloGMem, reservedGMem) - if gi == -1 { - logo.Errorln("CAN'T USE GPU FOR : Yolo No GPU Memory") - os.Exit(130) - - return nil, false - } - } - - return sdk.NewYDetectWithTrack(gi, cfg, weights, name), true -} diff --git a/proc/single.go b/proc/single.go deleted file mode 100644 index 2284ea2..0000000 --- a/proc/single.go +++ /dev/null @@ -1,128 +0,0 @@ -package proc - -import ( - "analysis/logo" - "analysis/util" - "context" - "strings" -) - -func slaveProc(ctx context.Context, typ string, id string, gpu int, shm bool) bool { - name, ok := prepare(typ, gpu) - if !ok { - logo.Errorln("SLAVEPROC ERROR: ", name) - return false - } - - if id != util.FakeSdkID { - build(name, id, shm) - runAll(ctx) - } else { - - util.InitDBAPI() - - sdks := util.SDKInfo() - // 棣栧厛杩愯yolo - for k, v := range sdks { - if strings.EqualFold(typ, v.SdkType) { - build(v.SdkType, v.IpcId, shm) - logo.Infof("SINGLE PROC ID %s TYPE %s\n", k, v.IpcId, v.SdkType) - } - } - - runAll(ctx) - } - return true -} - -// SingleRole 鍗曡繘绋� -func SingleRole(ctx context.Context, typ string, ipcID string, gpu int, shm bool) bool { - util.InitDBAPI() - - ret := false - if typ == util.FeatAll { // run all proc per proc - - ret = allProc(ctx, gpu, shm) - } else if typ == util.FeatFace || typ == util.FeatYolo { // run all face sdk - - ret = sdkProc(ctx, typ, gpu, shm) - } else { // run one sdk per proc, typ is facedetect/yolodetect/facecompare etc. id is ipc - ret = slaveProc(ctx, typ, ipcID, gpu, shm) - } - return ret -} - -func sdkProc(ctx context.Context, typ string, gpu int, shm bool) bool { - if typ != util.FeatYolo && typ != util.FeatFace { - logo.Errorf("NO THIS SDK PROC SDKPROC : ", typ) - } - rSDK := []string{ - FDetect, - FCompare, - } - if typ == util.FeatYolo { - rSDK = rSDK[0:0] - rSDK = append(rSDK, YDetect) - } - var res []bool - for _, v := range rSDK { - _, f := prepare(v, gpu) - res = append(res, f) - } - for k, v := range res { - if !v { - logo.Errorln(k, " type proc failed to init") - return false - } - } - - sdks := util.SDKInfo() - - for k, v := range sdks { - for _, s := range rSDK { - if s == v.SdkType { - build(v.SdkType, v.IpcId, shm) - logo.Infof("TYPE PROC ID %s TYPE %s\n", k, v.IpcId, v.SdkType) - } - } - } - - runAll(ctx) - return true -} - -func allProc(ctx context.Context, gpu int, shm bool) bool { - var res []bool - - for _, v := range SDK { - _, f := prepare(v, gpu) - res = append(res, f) - } - - for k, v := range res { - if !v { - logo.Errorln(k, " ALL PROC FAILED TO INIT") - return false - } - } - - sdks := util.SDKInfo() - // 棣栧厛杩愯yolo - for k, v := range sdks { - if v.SdkType != "Yolo" { - continue - } - build(v.SdkType, v.IpcId, shm) - logo.Infof("all proc %d sdk id %s run sdk %s\n", k, v.IpcId, v.SdkType) - } - for k, v := range sdks { - if v.SdkType == "Yolo" { - continue - } - build(v.SdkType, v.IpcId, shm) - logo.Infof("all proc %d sdk id %s run sdk %s\n", k, v.IpcId, v.SdkType) - } - - runAll(ctx) - return true -} diff --git a/sdkconfig.yaml b/sdkconfig.yaml deleted file mode 100644 index 7d7da23..0000000 --- a/sdkconfig.yaml +++ /dev/null @@ -1,3 +0,0 @@ -FaceDetect: LD_LIBRARY_PATH=./libs:/usr/local/cuda-8.0/lib64 -Yolo: LD_LIBRARY_PATH=./libs:/usr/local/cuda-8.0/lib64 -HumanTrack: LD_LIBRARY_PATH=./libs:/usr/local/cuda-10.0/lib64 diff --git a/work/cache/cache.go b/work/cache/cache.go deleted file mode 100644 index 2e8bfe4..0000000 --- a/work/cache/cache.go +++ /dev/null @@ -1,76 +0,0 @@ -package cache - -import ( - "fmt" - "strconv" - - "basic.com/dbapi.git" - "basic.com/pubsub/cache.git/shardmap" - "basic.com/pubsub/protomsg.git" - "basic.com/valib/gopherdiscovery.git" - "github.com/gogo/protobuf/proto" -) - -const ( - PREFIX_TASKSDKRULE = "TASKSDKRULE_" -) - -var cMap *shardmap.ShardMap - -func Init(initChan chan bool, dbIp string, surveyPort int, pubSubPort int) { - cMap = shardmap.New(uint8(32)) - urlSurvey := "tcp://" + dbIp + ":" + strconv.Itoa(surveyPort) - urlPubSub := "tcp://" + dbIp + ":" + strconv.Itoa(pubSubPort) - client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc") - recvMsg := client.HeartBeatMsg() - fmt.Println(<-recvMsg) - - initCacheData(initChan) - - peers, _ := client.Peers() - for b := range peers { - fmt.Println("peerMsg:", b) - updateData(b) - } -} - -func initCacheData(initChan chan bool) { - initTaskSdkRule() - initChan <- true -} - -func initTaskSdkRule() { - var api dbapi.TaskSdkRuleApi - - b, rules := api.FindAllTaskSdkRules() - if b { - if rules != nil { - for _, tRule := range rules { - cMap.Set(PREFIX_TASKSDKRULE+tRule.TaskId, tRule.SdkRules) - } - } - } -} - -func updateData(b []byte) { - newUpdateMsg := &protomsg.DbChangeMessage{} - if err := proto.Unmarshal(b, newUpdateMsg); err != nil { - fmt.Println("dbChangeMsg unmarshal err:", err) - return - } - switch newUpdateMsg.Table { - case protomsg.TableChanged_T_TaskSdkRule: - initTaskSdkRule() - default: - - } -} - -func GetTaskSdkRules(taskId string) []*protomsg.SdkRuleSet { - r, b := cMap.Get(PREFIX_TASKSDKRULE + taskId) - if b { - return r.([]*protomsg.SdkRuleSet) - } else { - return nil - } -} diff --git a/work/common.go b/work/common.go deleted file mode 100644 index 30a0653..0000000 --- a/work/common.go +++ /dev/null @@ -1,71 +0,0 @@ -package work - -import ( - "basic.com/valib/deliver.git" - - "basic.com/pubsub/protomsg.git" -) - -const mode = deliver.PushPull - -// MsgRS msg recv and snd -type MsgRS struct { - Msg protomsg.SdkMessage -} - -func snappyCompress(in []byte) ([]byte, error) { - return in, nil - // out := snappy.Encode(nil, in) - // return out, nil -} - -// Compress compress -func Compress(in []byte) ([]byte, error) { - return in, nil - // return lz4Compress(in) - // return snappyCompress(in) -} - -func lz4Compress(in []byte) ([]byte, error) { - return in, nil - - // out := make([]byte, len(in)) - // ht := make([]int, 64<<10) // buffer for the compression table - // n, err := lz4.CompressBlock(in, out, ht) - // if err != nil { - // logo.Errorln(err) - // return nil, err - // } - // if n >= len(in) { - // logo.Infoln("image is not compressible") - // } - // out = out[:n] // compressed data - // return out, nil -} - -//////////////////////////////////////////////////////////////// -func snappyUncompress(in []byte) ([]byte, error) { - return in, nil - // out, err := snappy.Decode(nil, in) - // return out, err -} - -// UnCompress uncompress -func UnCompress(in []byte) ([]byte, error) { - return in, nil - // return lz4Uncompress(in) - // return snappyUncompress(in) -} - -func lz4Uncompress(in []byte) ([]byte, error) { - return in, nil - - // out := make([]byte, 10*len(in)) - // n, err := lz4.UncompressBlock(in, out) - // if err != nil { - // logo.Errorln(err) - // return nil, err - // } - // out = out[:n] // uncompressed data - // return out, nil -} diff --git a/work/ipcreciever.go b/work/ipcreciever.go deleted file mode 100644 index d6fa105..0000000 --- a/work/ipcreciever.go +++ /dev/null @@ -1,148 +0,0 @@ -package work - -import ( - "analysis/logo" - "context" - - "time" - - "basic.com/valib/deliver.git" - - "basic.com/pubsub/protomsg.git" - "github.com/gogo/protobuf/proto" -) - -// Reciever recv from ipc -type Reciever struct { - ctx context.Context - ipcURL string - chMsg chan MsgRS - - shm bool -} - -// NewReciever new recv -func NewReciever(url string, chMsg chan MsgRS, shm bool) *Reciever { - return &Reciever{ - ipcURL: url, - chMsg: chMsg, - shm: shm, - } -} - -func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) { - for { - select { - case <-ctx.Done(): - return - default: - d := <-data - if len(d) < 100 { - continue - } - // logo.Infoln(len(d), "reciver鏁版嵁") - msg := protomsg.SdkMessage{} - if err := proto.Unmarshal(d, &msg); err != nil { - logo.Errorln(err, " msg 澶勭悊寮傚父") - continue - } - // logo.Infoln("RECV MSG: ", msg.Cid, " TASK: ", msg.Tasklab.Taskid, " SDK: ", msg.Tasklab.Sdkinfos[msg.Tasklab.Index].Sdktype) - outMsg := MsgRS{Msg: msg} - r.chMsg <- outMsg - } - } -} - -func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { - dataChan := make(chan []byte) - - go r.unserilizeProto(ctx, dataChan) - - // t := time.Now() - // sc := 0 - - for { - select { - case <-ctx.Done(): - i.Close() - return - default: - - if r.shm { - if d, err := i.Recv(); err != nil { - i.Close() - logo.Infoln("ANALYSIS RECV ERROR: ", err) - - c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) - for { - if err == nil { - break - } - time.Sleep(time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) - logo.Infoln("ANALYSIS CREATE FAILED : ", err) - } - i = c - logo.Infoln("ANALYSIS CREATE SHM") - } else { - if d != nil { - logo.Infoln("~~~shm recv image:", len(d)) - dataChan <- d - } - } - } else { - if msg, err := i.Recv(); err != nil { - // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) - } else { - logo.Infoln("~~~mangos recv image:", len(msg)) - dataChan <- msg - } - } - - // sc++ - // if sc == 25 { - // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) - // sc = 0 - // t = time.Now() - // } - - } - } -} - -// Run run a IPC client -func (r *Reciever) Run(ctx context.Context) { - if r.shm { - r.runShm(ctx) - } else { - r.run(ctx, deliver.NewClient(mode, r.ipcURL)) - } -} - -func (r *Reciever) runShm(ctx context.Context) { - c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) - logo.Infoln("CLIENT CREATE FAILED : ", err) - } - r.run(ctx, c) -} - -/////////////////////////////////////////////////// - -///////// test - -/////////////////////////////////////////////////// - -// Run2 run a IPC server or client -func (r *Reciever) Run2(ctx context.Context, server bool) { - if server { - r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL)) - } else { - r.run(ctx, deliver.NewClient(deliver.PushPull, r.ipcURL)) - } -} diff --git a/work/ipcsender.go b/work/ipcsender.go deleted file mode 100644 index e77d187..0000000 --- a/work/ipcsender.go +++ /dev/null @@ -1,170 +0,0 @@ -package work - -import ( - "analysis/logo" - - "context" - "time" - - "basic.com/pubsub/protomsg.git" - "basic.com/valib/deliver.git" - - "github.com/gogo/protobuf/proto" -) - -// Sender decoder ingo -type Sender struct { - ipcURL string - chMsg <-chan MsgRS - shm bool - fn func([]byte, bool) -} - -// ApplyCallbackFunc cb -func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) { - if s.fn == nil { - s.fn = f - } -} - -// NewSender Sender -func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool) *Sender { - // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL) - return &Sender{ - ipcURL: ipcURL, - chMsg: chMsg, - shm: shm, - fn: nil, - } -} - -func unpackImage(msg MsgRS, fnName string) *protomsg.Image { - // 瑙e帇鑾峰彇浼犲叆鐨勬暟鎹� - bData, err := UnCompress(msg.Msg.Data) - if err != nil { - logo.Errorf("%s uncompress image failed\n", fnName) - return nil - } - // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬 - i := &protomsg.Image{} - err = proto.Unmarshal(bData, i) - if err != nil { - logo.Errorf("%s protobuf decode CameraImage error : %s\n", fnName, err.Error()) - return nil - } - if i.Data == nil { - logo.Errorf("%s protomsg.Image data null\n", fnName) - return nil - } - return i -} - -func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) { - - for { - select { - case <-ctx.Done(): - logo.Infoln("stop Sender") - return - case i := <-s.chMsg: - - d, err := proto.Marshal(&i.Msg) - - if err != nil { - logo.Errorln("protobuf encode ipc sender error: ", err) - continue - } - - data <- d - - if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) { - if s.fn != nil { - - sFlag := true - for _, v := range i.Msg.Tasklab.Sdkinfos { - if len(v.Sdkdata) < 2 { - sFlag = false - break - } - } - s.fn(d, sFlag) - - } - } - } - } -} - -func (s *Sender) run(ctx context.Context, i deliver.Deliver) { - - // go ruleserver.TimeTicker() - - dataChan := make(chan []byte) - go s.serializeProto(ctx, dataChan) - - for { - select { - case <-ctx.Done(): - i.Close() - return - default: - - d := <-dataChan - - if s.shm { - if err := i.Send(d); err != nil { - i.Close() - logo.Infoln("ANALYSIS SENDER ERROR: ", err) - - c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) - for { - if err == nil { - break - } - time.Sleep(time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) - logo.Infoln("CLIENT CREATE FAILED : ", err) - } - i = c - } else { - - } - } else { - err := i.Send(d) - if err != nil { - // logo.Errorln("error sender 2 pubsub: ", err) - } else { - logo.Infoln("mangos send to pubsub len: ", len(d)) - } - } - } - } -} - -// Run run a IPC producer -func (s *Sender) Run(ctx context.Context) { - - if s.shm { - s.runShm(ctx) - } else { - i := deliver.NewClient(mode, s.ipcURL) - if i == nil { - logo.Errorln("sender 2 pubsub nng create error") - return - } - s.run(ctx, i) - } -} - -func (s *Sender) runShm(ctx context.Context) { - c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) - logo.Infoln("CLIENT CREATE FAILED : ", err) - } - s.run(ctx, c) -} diff --git a/work/sdk/facetrack.go b/work/sdk/facetrack.go deleted file mode 100644 index 462db59..0000000 --- a/work/sdk/facetrack.go +++ /dev/null @@ -1,247 +0,0 @@ -package sdk - -/* -#include<stdlib.h> - -#define LANDMARKS_NUM 25 -typedef struct { - int id; - float x; - float y; -} Key_Point; - -typedef struct { - int count; - int view; - Key_Point pts[LANDMARKS_NUM]; -} Face_Landmarks; - -void *resize(void **data, const float fx, const float fy){ - Face_Landmarks* lms = (Face_Landmarks*)(*data); - for(int i = 0; i < lms->count; i++){ - Key_Point kp = lms->pts[i]; - kp.x /= fx; - kp.y /= fy; - lms->pts[i] = kp; - } - - return *data; -} -*/ -import "C" -import ( - "analysis/logo" - "analysis/work" - "context" - "fmt" - "time" - "unsafe" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/protomsg.git" - "github.com/gogo/protobuf/proto" -) - -const ( - cacheFrameNum = 3 - trackChnTimeout = time.Duration(10) -) - -func (e *EFDetect) cleanChnStat() { - e.chnLock.Lock() - for i := 0; i < MaxFaceDetectThreadNum; i++ { - e.ftrackChanStats[i] = false - } - e.chnLock.Unlock() -} - -func (e *EFDetect) getAvailableChn() int { - e.chnLock.Lock() - defer e.chnLock.Unlock() - - for i := 0; i < MaxFaceDetectThreadNum; i++ { - if e.ftrackChanStats[i] == false { - e.ftrackChanStats[i] = true - return i - } - } - return -1 -} - -func (e *EFDetect) releaseChn(chn int) { - e.chnLock.Lock() - e.ftrackChanStats[chn] = false - e.chnLock.Unlock() -} - -func (e *EFDetect) detectTrack(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string) { - - for { - select { - case <-ctx.Done(): - e.fnFree(e.handle) - return - case rMsg := <-in: - if !validRemoteMessage(rMsg, typ) { - logo.Errorln("face track validremotemessage invalid") - ejectResult(nil, rMsg, out) - continue - } - - if _, ok := e.ftrackChans[rMsg.Msg.Cid]; ok { - e.ftrackChans[rMsg.Msg.Cid] <- rMsg - } else { - - e.ftrackChans[rMsg.Msg.Cid] = make(chan work.MsgRS, cacheFrameNum) - chn := e.getAvailableChn() - if chn < 0 { - logo.Infof("TOO MUCH CHANNEL") - ejectResult(nil, rMsg, out) - continue - } - e.ftrackChannels[rMsg.Msg.Cid] = chn - - i := unpackImage(rMsg, typ) - if i == nil { - ejectResult(nil, rMsg, out) - continue - } - // conv to bgr24 and resize - imgW, imgH := int(i.Width), int(i.Height) - ret := e.fnTrackerResize(e.handle, imgW, imgH, chn) - logo.Infof("ResizeFaceTracker: cid:%s, chan:%d, wXh:%d x %d ,result:%t\n", - rMsg.Msg.Cid, chn, imgW, imgH, ret) - go e.detectTrackOneChn(ctx, e.ftrackChans[rMsg.Msg.Cid], out, typ, chn) - e.ftrackChans[rMsg.Msg.Cid] <- rMsg - } - default: - time.Sleep(time.Millisecond * 100) - } - } -} - -func (e *EFDetect) detectTrackOneChn(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string, dtchn int) { - tm := time.Now() - sc := 0 - logo.Infof("detectTrackOneChn dtchn:%d\n", dtchn) - var curCid string - - for { - select { - case <-ctx.Done(): - return - - case rMsg := <-in: - - if !validRemoteMessage(rMsg, typ) { - ejectResult(nil, rMsg, out) - continue - } - - i := unpackImage(rMsg, typ) - if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { - ejectResult(nil, rMsg, out) - continue - } - - curCid = i.Cid - - // conv to bgr24 and resize - imgW, imgH := int(i.Width), int(i.Height) - - fRes := e.fnRun(e.handle, i.Data, imgW, imgH, 3, dtchn) - - var faces []*protomsg.ResultFaceDetect - //灏唖dk杩斿洖鍊艰浆鎹㈡垚protomsg绫诲瀷 - for _, r := range fRes { - - d := r.Info - /// filter rules - sdkid := rMsg.Msg.Tasklab.Sdkinfos[rMsg.Msg.Tasklab.Index].Ipcid - size := (d.RcFace.Right - d.RcFace.Left) * (d.RcFace.Bottom - d.RcFace.Top) - angle := d.FAngle - if !filter(rMsg.Msg.Tasklab.Taskid, sdkid, angle.Confidence, float32(angle.Yaw), int(size)) { - continue - } - /// filter rules - - feat := r.Feat - prop := (*protomsg.ThftResult)(unsafe.Pointer(&r.Prop)) - fpos := tconvert2ProtoFacePos(d) - - //缁勬垚缁撴灉骞跺簭鍒楀寲 - res := &protomsg.ResultFaceDetect{Pos: fpos, Result: prop, Feats: feat} - faces = append(faces, res) - - } - var err error - var data []byte - if len(faces) > 0 { - - facePos := protomsg.ParamFacePos{Faces: faces} - data, err = proto.Marshal(&facePos) - if err != nil { - fmt.Println("fdetect marshal proto face pos error", err) - data = nil - } - } - - ejectResult(data, rMsg, out) - var id, name string - if rMsg.Msg.Tasklab != nil { - id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname - } - logo.Infoln("CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT FACE COUNT: ", len(fRes)) - - sc++ - if sc == 25 { - logo.Infoln("CHAN:%d, FACE RUN 25 FRAME USE TIME: ", dtchn, time.Since(tm)) - sc = 0 - tm = time.Now() - } - - if time.Since(tm) > time.Second { - logo.Infof("CHAN:%d, FACE RUN %d FRAME USE TIME: %v", dtchn, sc, time.Since(tm)) - sc = 0 - tm = time.Now() - } - case <-time.After(trackChnTimeout * time.Second): - logo.Errorln("Timeout to get image, curCid:", curCid) - if curCid != "" { - delete(e.ftrackChans, curCid) - e.releaseChn(dtchn) - } - return - - } - } -} - -func tconvert2ProtoFacePos(dec sdkstruct.CFaceInfo) *protomsg.FacePos { - - crect := dec.RcFace - rect := protomsg.Rect{Left: crect.Left, Top: crect.Top, Right: crect.Right, Bottom: crect.Bottom} - leftEye := (*protomsg.Point)(unsafe.Pointer(&dec.PtLeftEye)) - rightEye := (*protomsg.Point)(unsafe.Pointer(&dec.PtRightEye)) - mouth := (*protomsg.Point)(unsafe.Pointer(&dec.PtMouth)) - nose := (*protomsg.Point)(unsafe.Pointer(&dec.PtNose)) - angle := (*protomsg.FaceAngle)(unsafe.Pointer(&dec.FAngle)) - faceID := uint64(dec.NFaceID) - - facialData := dec.PFacialData[:512] - - // facialData := make([]byte, 512) - // copy(facialData[:], dec.PFacialData[:512]) - - return &protomsg.FacePos{ - RcFace: &rect, - PtLeftEye: leftEye, - PtRightEye: rightEye, - PtMouth: mouth, - PtNose: nose, - FAngle: angle, - Quality: dec.NQuality, - FacialData: facialData, - FaceID: faceID, - } -} diff --git a/work/sdk/fdetect.go b/work/sdk/fdetect.go deleted file mode 100644 index 4798f90..0000000 --- a/work/sdk/fdetect.go +++ /dev/null @@ -1,121 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "analysis/work" - "context" - "plugin" - "sync" - - "basic.com/libgowrapper/sdkstruct.git" - - "basic.com/valib/gogpu.git" -) - -const MaxFaceDetectThreadNum = 32 - -// EFDetect detect face -type EFDetect struct { - threads int - iGPU int - - // track - faceAngle int - faceNum int - interval int - sampleSize int - - ftrackChans map[string]chan work.MsgRS - ftrackChannels map[string]int - ftrackChanStats []bool - chnLock sync.Mutex - - handle interface{} - fnInit func(func(...interface{})) interface{} - fnFree func(interface{}) - fnRun func(interface{}, []byte, int, int, int, int) []sdkstruct.CFaceResult - fnTrackerResize func(interface{}, int, int, int) bool - fnExtractor func(interface{}, int, int) bool - fnPropertizer func(interface{}, int) bool - fnTracker func(interface{}, int, int, int, int, int, int, int) bool -} - -// NewEFDetectWithTrack with track -func NewEFDetectWithTrack(gi, thrds, faceNum, faceAngle, interval, samp int) *EFDetect { - soFile := "libface.so" - - plug, err := plugin.Open(soFile) - if err != nil { - logo.Errorln("Open: ", soFile, " error: ", err) - return nil - } - - fnInit, _ := LoadFunc(plug, soFile, "NewSDK") - fnFree, _ := LoadFunc(plug, soFile, "Free") - fnRun, _ := LoadFunc(plug, soFile, "Run") - fnTrackerResize, _ := LoadFunc(plug, soFile, "TrackerResize") - fnExtractor, _ := LoadFunc(plug, soFile, "Extractor") - fnPropertizer, _ := LoadFunc(plug, soFile, "Propertizer") - fnTracker, _ := LoadFunc(plug, soFile, "Tracker") - - return &EFDetect{ - threads: thrds, - faceAngle: faceAngle, - ftrackChans: make(map[string]chan work.MsgRS, MaxFaceDetectThreadNum), - ftrackChannels: make(map[string]int, MaxFaceDetectThreadNum), - ftrackChanStats: make([]bool, MaxFaceDetectThreadNum, MaxFaceDetectThreadNum), - faceNum: faceNum, - interval: interval, - iGPU: gi, - sampleSize: samp, - - handle: nil, - fnInit: fnInit.(func(func(...interface{})) interface{}), - fnFree: fnFree.(func(interface{})), - fnRun: fnRun.(func(interface{}, []byte, int, int, int, int) []sdkstruct.CFaceResult), - fnTrackerResize: fnTrackerResize.(func(interface{}, int, int, int) bool), - fnExtractor: fnExtractor.(func(interface{}, int, int) bool), - fnPropertizer: fnPropertizer.(func(interface{}, int) bool), - fnTracker: fnTracker.(func(interface{}, int, int, int, int, int, int, int) bool), - } -} - -// Init impl interface -func (e *EFDetect) Init() bool { - - e.fakeInit() - - e.cleanChnStat() - - return true -} - -func (e *EFDetect) fakeInit() { - gpu := e.iGPU - - if gpu == -1 { - gpu = gogpu.ValidGPU(2048) - } - - e.handle = e.fnInit(logo.Infoln) - - if !e.fnTracker(e.handle, 1280, 720, e.faceNum, e.interval, 720, e.threads, gpu) { - logo.Errorln("FACE TRACKER CREATE ERROR") - } - logo.Infoln("Face Tracker Use GPU: ", gpu) - - if !e.fnPropertizer(e.handle, e.threads) { - logo.Errorln("FACE PROPERTIZER CREATE ERROR") - } - - if !e.fnExtractor(e.handle, e.threads, gpu) { - logo.Errorln("FACE EXTRACTOR CREATE ERROR") - } - - logo.Infoln("Face Extractor Use GPU: ", gpu) -} - -// Run impl interface -func (e *EFDetect) Run(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string) { - e.detectTrack(ctx, in, out, typ) -} diff --git a/work/sdk/fextract.go b/work/sdk/fextract.go deleted file mode 100644 index ea51493..0000000 --- a/work/sdk/fextract.go +++ /dev/null @@ -1,37 +0,0 @@ -package sdk - -import ( - "analysis/work" - "context" -) - -// EFExtract engine face extract -type EFExtract struct { - threads int -} - -// NewEFExtract new -func NewEFExtract(thread int) *EFExtract { - return &EFExtract{thread} -} - -// Init impl interface -func (e *EFExtract) Init() bool { - - return true -} - -// Run impl interface -func (e *EFExtract) Run(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string) { - - for { - select { - case <-ctx.Done(): - return - default: - rMsg := <-in - - ejectResult(nil, rMsg, out) - } - } -} diff --git a/work/sdk/flow.go b/work/sdk/flow.go deleted file mode 100644 index 329c899..0000000 --- a/work/sdk/flow.go +++ /dev/null @@ -1,117 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "analysis/work" - "container/list" - "context" - "sync" - "time" -) - -// LockList list -type LockList struct { - cache *list.List - cv *sync.Cond - cond bool - size int -} - -// NewLockList new -func NewLockList(size int) *LockList { - return &LockList{ - cache: list.New(), - cv: sync.NewCond(&sync.Mutex{}), - cond: false, - size: size, - } -} - -// Push push -func (l *LockList) Push(v interface{}) { - l.cv.L.Lock() - l.cache.PushBack(v) - - for l.cache.Len() > l.size { - l.cache.Remove(l.cache.Front()) - } - - l.cond = true - l.cv.Signal() - l.cv.L.Unlock() -} - -// Pop pop -func (l *LockList) Pop() interface{} { - l.cv.L.Lock() - - for !l.cond { - l.cv.Wait() - } - - elem := l.cache.Front().Value - - l.cache.Remove(l.cache.Front()) - l.cond = false - l.cv.L.Unlock() - - return elem -} - -///////////////////////////////////////////////////////////////// - -func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string, - fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) { - - tm := time.Now() - sc := 0 - - for { - select { - case <-ctx.Done(): - return - default: - - rMsg := fnConsume().(work.MsgRS) - - fnRun(rMsg, out, typ) - - sc++ - if sc == 25 { - logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) - sc = 0 - tm = time.Now() - } - if time.Since(tm) > time.Second { - logo.Infof("%s RUN %d FRAME USE TIME: %v", typ, sc, time.Since(tm)) - sc = 0 - tm = time.Now() - } - } - } - -} - -// FlowSimple wrap -func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string, - fnProduce func(interface{}), fnConsume func() interface{}, - fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) { - - go flowSimpleWork(ctx, out, typ, fnConsume, fnRun) - - for { - select { - case <-ctx.Done(): - fnClose() - return - default: - rMsg := <-in - if !validRemoteMessage(rMsg, typ) { - logo.Errorln(typ, " validremotemessage invalid") - ejectResult(nil, rMsg, out) - continue - } - fnProduce(rMsg) - } - } -} diff --git a/work/sdk/humantrack.go b/work/sdk/humantrack.go deleted file mode 100644 index f0edec8..0000000 --- a/work/sdk/humantrack.go +++ /dev/null @@ -1,176 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "analysis/work" - "context" - "plugin" - - "github.com/gogo/protobuf/proto" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/protomsg.git" - "basic.com/valib/gogpu.git" -) - -// HumanTracker track -type HumanTracker struct { - gpu int - batchSize int - flag int - - list *LockList - - handle interface{} - fnInit func(int, int, int, func(...interface{})) interface{} - fnFree func(interface{}) - fnRun func(interface{}, []byte, int, int, int) []sdkstruct.FgResult - fnProcess func(interface{}, []sdkstruct.SDKImage) ([]sdkstruct.FgResult, error) - fnVer func(interface{}) string - fnSimilarity func([]float32, []float32) (float64, error) -} - -// NewHumanTracker new -func NewHumanTracker(gpu, batchSize, flag int) *HumanTracker { - soFile := "libhumantrack.so" - - plug, err := plugin.Open(soFile) - if err != nil { - logo.Errorln("Open: ", soFile, " error: ", err) - return nil - } - - fnInit, _ := LoadFunc(plug, soFile, "NewSDK") - fnFree, _ := LoadFunc(plug, soFile, "Free") - fnRun, _ := LoadFunc(plug, soFile, "Run") - fnProc, _ := LoadFunc(plug, soFile, "Process") - fnVer, _ := LoadFunc(plug, soFile, "GetVersion") - fnSim, _ := LoadFunc(plug, soFile, "CFFSimilarity") - - return &HumanTracker{ - gpu: gpu, - batchSize: batchSize, - flag: flag, - - list: NewLockList(6), - - handle: nil, - fnInit: fnInit.(func(int, int, int, func(...interface{})) interface{}), - fnFree: fnFree.(func(interface{})), - fnRun: fnRun.(func(interface{}, []byte, int, int, int) []sdkstruct.FgResult), - fnProcess: fnProc.(func(interface{}, []sdkstruct.SDKImage) ([]sdkstruct.FgResult, error)), - fnVer: fnVer.(func(interface{}) string), - fnSimilarity: fnSim.(func([]float32, []float32) (float64, error)), - } -} - -// Init impl -func (t *HumanTracker) Init() bool { - if t.batchSize != 1 { - logo.Errorln("ONLY SUPPORT BATCH SIZE = 1") - return false - } - gpu := t.gpu - if gpu == -1 { - gpu = gogpu.ValidGPU(2048) - } - h := t.fnInit(gpu, t.batchSize, t.flag, logo.Infoln) - logo.Infoln("HumanTrack USE GPU: ", gpu) - - if h == nil { - logo.Errorln("CREATE HumanTrack DETECTOR ERROR") - return false - } - - t.handle = h - return true -} - -func (t *HumanTracker) track(rMsg work.MsgRS, out chan<- work.MsgRS, typ string) { - i := unpackImage(rMsg, typ) - if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { - ejectResult(nil, rMsg, out) - return - } - - imgW, imgH := int(i.Width), int(i.Height) - - // var images []sdkstruct.SDKImage - // img := sdkstruct.SDKImage{ - // Data: i.Data, - // Width: imgW, - // Height: imgH, - // Channel: 3, - // } - // images = append(images, img) - res := t.fnRun(t.handle, i.Data, imgW, imgH, 3) - if res == nil { - logo.Errorln("HUMAN TRACKER RUN ERROR") - - ejectResult(nil, rMsg, out) - return - } - - hr := convert2ProtoHumanTrackResult(res) - result := protomsg.HumanTrackResult{Result: hr[0]} - data, err := proto.Marshal(&result) - if err != nil { - logo.Errorln("HUMAN TRACKER MARSHAL PROTO ERROR", err) - data = nil - } - ejectResult(data, rMsg, out) - var id, name string - if rMsg.Msg.Tasklab != nil { - id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname - } - - logo.Infoln("CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT YOLO COUNT: ", len(hr[0])) - -} - -// Run impl -func (t *HumanTracker) Run(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string) { - - FlowSimple(ctx, in, out, typ, t.list.Push, t.list.Pop, t.track, func() { t.fnFree(t.handle) }) -} - -// message HumanTrack { -// Rect rcHuman = 1; -// float confidence = 2; -// int32 x = 3; -// int32 y = 4; -// int32 id = 5; -// repeated float feature = 6; -// } - -// message HumanTrackResult { -// repeated HumanTrack result = 1; -// } - -func convert2ProtoHumanTrackResult(obj []sdkstruct.FgResult) [][]*protomsg.HumanTrack { - ret := [][]*protomsg.HumanTrack{} - for _, v := range obj { - res := []*protomsg.HumanTrack{} - for i := 0; i < int(v.FgNum); i++ { - r := v.Fginfo[i] - - rect := protomsg.Rect{ - Left: r.Left, - Right: r.Right, - Top: r.Top, - Bottom: r.Bottom, - } - pr := &protomsg.HumanTrack{ - RcHuman: &rect, - Confidence: r.Confidence, - X: r.X, - Y: r.Y, - Id: r.ID, - Feature: r.Feature[:], - } - res = append(res, pr) - } - ret = append(ret, res) - } - return ret -} diff --git a/work/sdk/interface.go b/work/sdk/interface.go deleted file mode 100644 index 12fbf0f..0000000 --- a/work/sdk/interface.go +++ /dev/null @@ -1,145 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "analysis/work" - "analysis/work/cache" - "context" - "encoding/binary" - "math" - - "basic.com/pubsub/protomsg.git" - "basic.com/ruleForSdk.git" - - "github.com/gogo/protobuf/proto" - sq "github.com/yireyun/go-queue" -) - -// Engine interface sdk -type Engine interface { - Init() bool - Run(context.Context, <-chan work.MsgRS, chan<- work.MsgRS, string) -} - -func createQueue(length int) *sq.EsQueue { - q := sq.NewQueue(uint32(length)) - for i := 0; i < length; i++ { - q.Put(i) - } - return q -} - -func ejectResult(res []byte, msg work.MsgRS, out chan<- work.MsgRS) { - //鍘嬬缉鏁版嵁 - // rData, err := work.Compress(res) - // if err != nil { - // logo.Errorln("compress result error: ", err) - // return - // } - - if res == nil { - out <- msg - return - } - index := int(msg.Msg.Tasklab.Index) - - if index >= len(msg.Msg.Tasklab.Sdkinfos) { - return - } - - // var tmpD []byte - // var tmpE error - // tmpD, tmpE = proto.Marshal(&msg.Msg) - // if tmpE == nil { - // logo.Infoln(msg.Msg.Tasklab.Sdkinfos[index].Sdktype, " orig SDK DATA: ", len(tmpD), " index :", index) - // } - - msg.Msg.Tasklab.Sdkinfos[index].Sdkdata = res - - // tmpD, tmpE = proto.Marshal(&msg.Msg) - // if tmpE == nil { - // logo.Infoln(msg.Msg.Tasklab.Sdkinfos[index].Sdktype, " after SDK DATA: ", len(tmpD), " index :", index) - // } - - // if index == 1{ - // logo.Infoln(msg.Msg.Tasklab.Sdkinfos[index].Sdktype, " orig SDK DATA: ", len(res), " index :", index) - // } - - out <- msg -} - -func byteToFloat32(bytes []byte) float32 { - bits := binary.LittleEndian.Uint32(bytes) - return math.Float32frombits(bits) -} - -func float32ToByte(float float32) []byte { - bits := math.Float32bits(float) - bytes := make([]byte, 4) - binary.LittleEndian.PutUint32(bytes, bits) - return bytes -} - -///////////////////////////////////////////////////////////// -func validRemoteMessage(msg work.MsgRS, fnName string) bool { - if msg.Msg.Tasklab == nil { - logo.Errorf("%s recieve msg nil\n", fnName) - return false - } - - sdkLen := len(msg.Msg.Tasklab.Sdkinfos) - if sdkLen == 0 { - logo.Errorf("%s has no sdk info\n", fnName) - return false - } - - curIndex := int(msg.Msg.Tasklab.Index) - if curIndex < 0 || curIndex >= sdkLen { - logo.Errorf("%s tasklab index %d error\n", fnName, curIndex) - return false - } - if msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName { - logo.Errorf("%s is different from %s\n", fnName, msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype) - return false - } - return true -} - -func unpackImage(msg work.MsgRS, fnName string) *protomsg.Image { - // 瑙e帇鑾峰彇浼犲叆鐨勬暟鎹� - bData, err := work.UnCompress(msg.Msg.Data) - if err != nil { - logo.Errorf("%s uncompress image failed\n", fnName) - return nil - } - // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬 - i := &protomsg.Image{} - err = proto.Unmarshal(bData, i) - if err != nil { - logo.Errorf("%s protobuf decode CameraImage error : %s\n", fnName, err.Error()) - return nil - } - if i.Data == nil { - logo.Errorf("%s protomsg.Image data null\n", fnName) - return nil - } - return i -} - -//////////////////////////////////////////////////////////// -func filter(tid, sid string, score, angle float32, size int) bool { - return true - data := ruleForSdk.TargetData{ - TaskId: tid, - SdkId: sid, - Score: score * 100.0, - Size: size, - Angle: angle, - } - rules := cache.GetTaskSdkRules(data.TaskId) - if rules == nil { - return true - } - logo.Infoln("TASKSDKRULE: ", rules) - return ruleForSdk.Judge(data, rules) -} diff --git a/work/sdk/plugin.go b/work/sdk/plugin.go deleted file mode 100644 index a831b14..0000000 --- a/work/sdk/plugin.go +++ /dev/null @@ -1,15 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "plugin" -) - -// LoadFunc load -func LoadFunc(plug *plugin.Plugin, soFile, fnName string) (plugin.Symbol, error) { - fn, err := plug.Lookup(fnName) - if err != nil { - logo.Errorln("Loopup Func: ", fnName, " From: ", soFile, " Error: ", err) - } - return fn, err -} diff --git a/work/sdk/vdetect.go b/work/sdk/vdetect.go deleted file mode 100644 index 2ee2903..0000000 --- a/work/sdk/vdetect.go +++ /dev/null @@ -1,158 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "analysis/work" - "context" - "fmt" - "plugin" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/protomsg.git" - "github.com/gogo/protobuf/proto" -) - -// VehicleDetector plate id -type VehicleDetector struct { - licSrvPath string - modelPath string - - list *LockList - - handle interface{} - fnInit func(int, int, string, string, func(...interface{})) interface{} - fnFree func(interface{}) - fnRun func(interface{}, []byte, int, int, int, int, int, int, int) []sdkstruct.CVehicleITSResult -} - -// NewVehicleDetector plate -func NewVehicleDetector(licSrv, model string, w, h int) *VehicleDetector { - soFile := "libvehicle.so" - - plug, err := plugin.Open(soFile) - if err != nil { - logo.Errorln("Open: ", soFile, " error: ", err) - return nil - } - - fnInit, _ := LoadFunc(plug, soFile, "NewSDK") - fnFree, _ := LoadFunc(plug, soFile, "Free") - fnRun, _ := LoadFunc(plug, soFile, "Run") - - return &VehicleDetector{ - licSrvPath: licSrv, - modelPath: model, - - list: NewLockList(6), - - handle: nil, - fnInit: fnInit.(func(int, int, string, string, func(...interface{})) interface{}), - fnFree: fnFree.(func(interface{})), - fnRun: fnRun.(func(interface{}, []byte, int, int, int, int, int, int, int) []sdkstruct.CVehicleITSResult), - } -} - -// Init impl -func (d *VehicleDetector) Init() bool { - h := d.fnInit(2, 0, d.licSrvPath, d.modelPath, logo.Infoln) - if h == nil { - logo.Errorln("INIT VEHICLE SDK ERROR") - return false - } - d.handle = h - - logo.Infoln("RUN VEHICLE SDK") - - return true -} - -func (d *VehicleDetector) detect(rMsg work.MsgRS, out chan<- work.MsgRS, typ string) { - i := unpackImage(rMsg, typ) - if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { - ejectResult(nil, rMsg, out) - return - } - - imgW, imgH := int(i.Width), int(i.Height) - - vehicle := d.fnRun(d.handle, i.Data, imgW, imgH, 3, 0, 0, imgW, imgH) - plateids := convert2ProtoPlateIDResultVehicle(vehicle) - - plateresult := protomsg.PlateIDResult{Result: plateids} - data, err := proto.Marshal(&plateresult) - if err != nil { - fmt.Println("PLATE ID DETECTOR MARSHAL PROTO PLATE IDS ERROR", err) - data = nil - } - - ejectResult(data, rMsg, out) - - var id, name string - if rMsg.Msg.Tasklab != nil { - id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname - } - logo.Infoln("CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT VEHICLE PLATE ID COUNT: ", len(vehicle)) - -} - -// Run impl -func (d *VehicleDetector) Run(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string) { - FlowSimple(ctx, in, out, typ, d.list.Push, d.list.Pop, d.detect, func() { d.fnFree(d.handle) }) -} - -func convert2ProtoPlateIDResultVehicle(obj []sdkstruct.CVehicleITSResult) []*protomsg.PlateIDVehicle { - ret := []*protomsg.PlateIDVehicle{} - - for _, v := range obj { - - rcPlateID := &protomsg.Rect{ - Left: v.RcLocation.Left, - Right: v.RcLocation.Right, - Top: v.RcLocation.Top, - Bottom: v.RcLocation.Bottom, - } - rcCar := &protomsg.Rect{ - Left: v.RcCarLocation.Left, - Right: v.RcCarLocation.Right, - Top: v.RcCarLocation.Top, - Bottom: v.RcCarLocation.Bottom, - } - - lic := string(v.License[:]) - end := len(lic) - for i := len(lic) - 1; i >= 0; i-- { - if lic[i] != '\000' { - end = i + 1 - break - } - } - if end > 0 { - lic = lic[:end] - } - - obj := &protomsg.PlateIDVehicle{ - NPlateFlag: v.NPlateFlag, - NColor: v.NColor, - NType: v.NType, - License: lic, - NConfidence: v.NConfidence, - NCharNum: v.NCharNum, - NCharConfidence: v.NCharConfidence[:], - RcLocation: rcPlateID, - RcCarLocation: rcCar, - VehicleType: string(v.VehicleType[:]), - VehicleBrand: string(v.VehicleBrand[:]), - VehicleSub: string(v.VehicleSub[:]), - VehicleType1: string(v.VehicleType1[:]), - FvdConf: v.FVDConf, - FConfdence: v.FConfdence, - NVehicleSubModel: v.NVehicleSubModel, - NVehicleBright: v.NVehicleBright, - NVehicleColor1: v.NVehicleColor1, - NVehicleColor2: v.NVehicleColor2, - } - - ret = append(ret, obj) - } - return ret -} diff --git a/work/sdk/ydetect.go b/work/sdk/ydetect.go deleted file mode 100644 index 84364f9..0000000 --- a/work/sdk/ydetect.go +++ /dev/null @@ -1,147 +0,0 @@ -package sdk - -import ( - "analysis/logo" - "analysis/work" - "context" - "fmt" - "plugin" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/protomsg.git" - "basic.com/valib/gogpu.git" - "github.com/gogo/protobuf/proto" -) - -// YoloDetect yolo detect -type YoloDetect struct { - iGPU int - cfg string - weights string - name string - - list *LockList - - handle interface{} - fnInit func(string, string, string, int, func(...interface{})) interface{} - fnFree func(interface{}) - fnRun func(interface{}, string, []byte, int, int, int, float32, int) ([]sdkstruct.CObjTrackInfo, []sdkstruct.CObjTrackInfo) -} - -// NewYDetectWithTrack with track -func NewYDetectWithTrack(gi int, cfg, weights, name string) *YoloDetect { - soFile := "libyolo.so" - - plug, err := plugin.Open(soFile) - if err != nil { - logo.Errorln("Open: ", soFile, " error: ", err) - return nil - } - - fnInit, _ := LoadFunc(plug, soFile, "NewSDK") - fnFree, _ := LoadFunc(plug, soFile, "Free") - fnRun, _ := LoadFunc(plug, soFile, "Run") - - return &YoloDetect{ - iGPU: gi, - cfg: cfg, - weights: weights, - name: name, - - list: NewLockList(6), - - handle: nil, - fnInit: fnInit.(func(string, string, string, int, func(...interface{})) interface{}), - fnFree: fnFree.(func(interface{})), - fnRun: fnRun.(func(interface{}, string, []byte, int, int, int, float32, int) ([]sdkstruct.CObjTrackInfo, []sdkstruct.CObjTrackInfo)), - } -} - -// Init impl interface -func (y *YoloDetect) Init() bool { - - gpu := y.iGPU - if gpu == -1 { - gpu = gogpu.ValidGPU(2048) - } - h := y.fnInit(y.cfg, y.weights, y.name, gpu, logo.Infoln) - logo.Infoln("YOLO USE GPU: ", gpu) - - if h == nil { - logo.Errorln("CREATE YOLO DETECTOR ERROR") - return false - } - - y.handle = h - return true -} - -// Run impl interface -func (y *YoloDetect) Run(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string) { - FlowSimple(ctx, in, out, typ, y.list.Push, y.list.Pop, y.track, func() { y.fnFree(y.handle) }) -} - -func (y *YoloDetect) track(rMsg work.MsgRS, out chan<- work.MsgRS, typ string) { - - ///////////////////////////////////////////// - i := unpackImage(rMsg, typ) - if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { - logo.Errorln("yolo image error: ", i) - ejectResult(nil, rMsg, out) - - return - } - - imgW, imgH := int(i.Width), int(i.Height) - - whole, recent := y.fnRun(y.handle, rMsg.Msg.Cid, i.Data, imgW, imgH, 3, 0.4, 0) - if len(recent) > 0 { - } - - infos := convert2ProtoYoloTrack(whole, 1.0, 1.0) - p := protomsg.ParamYoloObj{Infos: infos} - - data, err := proto.Marshal(&p) - if err != nil { - fmt.Println("ydetect track marshal proto yolo obj error", err) - data = nil - } - - ejectResult(data, rMsg, out) - - var id, name string - if rMsg.Msg.Tasklab != nil { - id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname - } - - logo.Infoln("CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT YOLO COUNT: ", len(whole)) -} - -func convert2ProtoYoloTrack(obj []sdkstruct.CObjTrackInfo, fx, fy float64) []*protomsg.ObjInfo { - ret := []*protomsg.ObjInfo{} - - for _, v := range obj { - if fx < 1.0 || fy < 1.0 { - v.ObjInfo.RcObj.Left = (int32)((float64)(v.ObjInfo.RcObj.Left) / fx) - v.ObjInfo.RcObj.Right = (int32)((float64)(v.ObjInfo.RcObj.Right) / fx) - v.ObjInfo.RcObj.Top = (int32)((float64)(v.ObjInfo.RcObj.Top) / fy) - v.ObjInfo.RcObj.Bottom = (int32)((float64)(v.ObjInfo.RcObj.Bottom) / fy) - } - - rect := protomsg.Rect{ - Left: v.ObjInfo.RcObj.Left, - Right: v.ObjInfo.RcObj.Right, - Top: v.ObjInfo.RcObj.Top, - Bottom: v.ObjInfo.RcObj.Bottom, - } - obj := protomsg.ObjInfo{ - RcObj: &rect, - Typ: v.ObjInfo.Typ, - Prob: v.ObjInfo.Prob, - ObjID: v.ID, - } - - ret = append(ret, &obj) - } - return ret -} diff --git a/work/torule.go b/work/torule.go deleted file mode 100644 index 02e7ce1..0000000 --- a/work/torule.go +++ /dev/null @@ -1,125 +0,0 @@ -package work - -import ( - "analysis/logo" - "container/list" - "context" - "sync" - "time" - - "basic.com/valib/deliver.git" - // "basic.com/pubsub/protomsg.git" - // "github.com/gogo/protobuf/proto" -) - -type runResult struct { - data []byte - valid bool -} - -// ToRule ipc -type ToRule struct { - maxSize int - cache *list.List - cv *sync.Cond - cond bool -} - -// NewToRule send to ruleprocess -func NewToRule(maxSize int) *ToRule { - return &ToRule{ - maxSize: maxSize, - cache: list.New(), - cv: sync.NewCond(&sync.Mutex{}), - cond: false, - } -} - -// Push data -func (t *ToRule) Push(data []byte, valid bool) { - t.cv.L.Lock() - result := runResult{data, valid} - t.cache.PushBack(result) - if t.cache.Len() > t.maxSize { - for i := 0; i < t.cache.Len(); { - d := t.cache.Front().Value.(runResult) - if d.valid == false { - t.cache.Remove(t.cache.Front()) - i = i + 2 - } else { - i = i + 1 - } - } - } - if t.cache.Len() > t.maxSize { - for i := 0; i < t.cache.Len(); { - t.cache.Remove(t.cache.Front()) - i = i + 2 - } - } - // logo.Infof("push to cache count : %d\n", t.cache.Len()) - t.cond = true - t.cv.Signal() - t.cv.L.Unlock() -} - -// Run forever -func (t *ToRule) Run(ctx context.Context, ipcAddr string) { - var i deliver.Deliver - var err error - - for { - i, err = deliver.NewClientWithError(deliver.PushPull, ipcAddr) - if err != nil { - time.Sleep(time.Second) - logo.Errorln("wait create to rule ipc", err) - continue - } - break - } - - for { - select { - case <-ctx.Done(): - return - default: - - var d []byte - t.cv.L.Lock() - - for !t.cond { - t.cv.Wait() - } - - for j := 0; j < 8; j++ { - if t.cache.Len() <= 0 { - break - } - - d = t.cache.Front().Value.(runResult).data - if i != nil && d != nil { - - err := i.Send(d) - if err != nil { - logo.Errorln("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err) - } else { - logo.Infoln("~~~~~~SEND TO RULE CORRECT") - // msg := protomsg.SdkMessage{} - // if err := proto.Unmarshal(d, &msg); err != nil { - // logo.Errorln(err, " msg 澶勭悊寮傚父") - // continue - // } - // for _, v := range msg.Tasklab.Sdkinfos { - // logo.Infof("%d SDK DATA SEND TO RULE PROCESS CAMERA ID %s TASKID: %s, SKD %s, LEN: %d\n", len(msg.Tasklab.Sdkinfos), msg.Cid, msg.Tasklab.Taskid, v.Sdktype, len(v.Sdkdata)) - // } - } - } - t.cache.Remove(t.cache.Front()) - } - - t.cond = false - t.cv.L.Unlock() - - } - } -} -- Gitblit v1.8.0