From c804940ec5bcdcfa0ffe90e03c6866d3bb651416 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 14 一月 2020 17:13:21 +0800 Subject: [PATCH] add chan cache for async --- run.go | 65 ++++++++++++++++++++++++++++++-- 1 files changed, 60 insertions(+), 5 deletions(-) diff --git a/run.go b/run.go index f7e9e12..bba0c82 100644 --- a/run.go +++ b/run.go @@ -2,6 +2,7 @@ import ( "context" + "os" "sync" "time" @@ -11,6 +12,8 @@ "basic.com/pubsub/protomsg.git" "basic.com/valib/gogpu.git" ) + +const maxTryBeforeReboot = 10 type face struct { handle *SDKFace @@ -28,11 +31,45 @@ shm bool ipc2Rule string ruleMsgMaxCacheSize int - reserved map[string]string + reserved map[string]interface{} + + running bool + rebootUntil int + mtxRunning sync.Mutex +} + +func (f *face) maybeReboot(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + f.mtxRunning.Lock() + running := f.running + f.mtxRunning.Unlock() + + if running { + f.rebootUntil = 0 + + f.mtxRunning.Lock() + f.running = false + f.mtxRunning.Unlock() + + } else { + f.rebootUntil++ + f.fnLogger("Face No Running: ", f.rebootUntil) + if f.rebootUntil > maxTryBeforeReboot { + f.fnLogger("Face Too Long Running, Reboot") + os.Exit(0) + } + } + time.Sleep(time.Second) + } + } } // Create create sdk -func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{} { +func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} { handle := NewSDK(fn) if handle == nil { @@ -114,6 +151,15 @@ ipc2Rule: ipc2Rule, ruleMsgMaxCacheSize: ruleMaxSize, reserved: reserved, + + running: true, + rebootUntil: maxTryBeforeReboot, + } +} + +func (f *face) release() { + if f.handle != nil { + f.handle.Free() } } @@ -127,8 +173,8 @@ ) ipcRcv := sdkhelper.GetIpcAddress(s.shm, s.id+postPull) ipcSnd := sdkhelper.GetIpcAddress(s.shm, s.id+postPush) - chRcv := make(chan []byte) - chSnd := make(chan sdkstruct.MsgSDK) + chRcv := make(chan []byte, s.maxChannel) + chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel) rcver := sdkhelper.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger) snder := sdkhelper.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger) @@ -141,6 +187,8 @@ go torule.Run(ctx) go s.run(ctx, chRcv, chSnd) + + go s.maybeReboot(ctx) } ////////////////////////////////////////////////////////////////// @@ -194,6 +242,7 @@ } if _, ok := f.ftrackChans[rMsg.Cid]; ok { + f.fnLogger("Face Cache Size: ", len(f.ftrackChans)) f.ftrackChans[rMsg.Cid] <- rMsg } else { @@ -253,14 +302,20 @@ // conv to bgr24 and resize imgW, imgH := int(i.Width), int(i.Height) + f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Cid) + count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn) sdkhelper.EjectResult(data, rMsg, out) + f.mtxRunning.Lock() + f.running = true + f.mtxRunning.Unlock() + var id, name string if rMsg.Tasklab != nil { id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname } - f.fnLogger("CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) + f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) sc++ if sc == 25 { -- Gitblit v1.8.0