From 5aefcc3832d9a1660ec9b04aa4cf2a69ced1a84d Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 13 一月 2020 11:01:11 +0800 Subject: [PATCH] debug --- run.go | 165 ++++++++++++++++++++++++++++++++----------------------- 1 files changed, 96 insertions(+), 69 deletions(-) diff --git a/run.go b/run.go index e1f3cbd..f915794 100644 --- a/run.go +++ b/run.go @@ -14,7 +14,6 @@ type face struct { handle *SDKFace - list *sdkhelper.LockList maxChannel int ftrackChans map[string]chan protomsg.SdkMessage @@ -101,7 +100,6 @@ return &face{ handle: handle, - list: sdkhelper.NewLockList(maxChan + maxChan/2), maxChannel: maxChan, ftrackChans: make(map[string]chan protomsg.SdkMessage, maxChan), @@ -119,52 +117,114 @@ } } -func (f *face) release() { - if f.handle != nil { - f.handle.Free() - } -} - // Run run func Run(ctx context.Context, i interface{}) { s := i.(*face) - chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger) - go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger) + const ( + postPull = `_1` + postPush = `_2` + ) + ipcRcv := sdkhelper.GetIpcAddress(s.shm, s.id+postPull) + ipcSnd := sdkhelper.GetIpcAddress(s.shm, s.id+postPush) + chRcv := make(chan []byte) + chSnd := make(chan sdkstruct.MsgSDK) + + rcver := sdkhelper.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger) + snder := sdkhelper.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger) + torule := sdkhelper.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger) + + snder.ApplyCallbackFunc(torule.Push) + + go rcver.Run(ctx) + go snder.Run(ctx) + go torule.Run(ctx) + + go s.run(ctx, chRcv, chSnd) } -func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) { +////////////////////////////////////////////////////////////////// +const ( + cacheFrameNum = 3 + trackChnTimeout = time.Duration(10) +) - for _, msg := range msgs { - if _, ok := f.ftrackChans[msg.Cid]; ok { - f.ftrackChans[msg.Cid] <- msg - } else { +func (f *face) cleanChnStat() { + f.chnLock.Lock() + for i := 0; i < f.maxChannel; i++ { + f.ftrackChanStats[i] = false + } + f.chnLock.Unlock() +} - f.ftrackChans[msg.Cid] = make(chan protomsg.SdkMessage, cacheFrameNum) - chn := f.getAvailableChn() - if chn < 0 { - f.fnLogger("TOO MUCH CHANNEL") - sdkhelper.EjectResult(nil, msg, out) +func (f *face) getAvailableChn() int { + f.chnLock.Lock() + defer f.chnLock.Unlock() + + for i := 0; i < f.maxChannel; i++ { + if f.ftrackChanStats[i] == false { + f.ftrackChanStats[i] = true + return i + } + } + return -1 +} + +func (f *face) releaseChn(chn int) { + f.chnLock.Lock() + f.ftrackChanStats[chn] = false + f.chnLock.Unlock() +} + +func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) { + + chMsg := make(chan protomsg.SdkMessage) + go sdkhelper.UnserilizeProto(ctx, in, chMsg, f.fnLogger) + + for { + select { + case <-ctx.Done(): + f.handle.Free() + return + case rMsg := <-chMsg: + if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { + f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID") + sdkhelper.EjectResult(nil, rMsg, out) continue } - f.ftrackChannels[msg.Cid] = chn - i := sdkhelper.UnpackImage(msg, f.typ, f.fnLogger) - if i == nil { - sdkhelper.EjectResult(nil, msg, out) - continue + if _, ok := f.ftrackChans[rMsg.Cid]; ok { + f.ftrackChans[rMsg.Cid] <- rMsg + } else { + + f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, cacheFrameNum) + chn := f.getAvailableChn() + if chn < 0 { + f.fnLogger("TOO MUCH CHANNEL") + sdkhelper.EjectResult(nil, rMsg, out) + continue + } + f.ftrackChannels[rMsg.Cid] = chn + + i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) + if i == nil { + sdkhelper.EjectResult(nil, rMsg, out) + continue + } + // conv to bgr24 and resize + imgW, imgH := int(i.Width), int(i.Height) + ret := f.handle.TrackerResize(imgW, imgH, chn) + f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret) + go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn) + f.ftrackChans[rMsg.Cid] <- rMsg } - // conv to bgr24 and resize - imgW, imgH := int(i.Width), int(i.Height) - ret := f.handle.TrackerResize(imgW, imgH, chn) - f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret) - go f.detectTrackOneChn(f.ftrackChans[msg.Cid], out, chn) - f.ftrackChans[msg.Cid] <- msg + default: + time.Sleep(time.Millisecond * 100) } } } -func (f *face) detectTrackOneChn(in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) { +func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) { tm := time.Now() sc := 0 f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn) @@ -172,20 +232,19 @@ for { select { + case <-ctx.Done(): + return + case rMsg := <-in: if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { sdkhelper.EjectResult(nil, rMsg, out) - f.fnLogger("Face!!!!!!SkdMessage Invalid") - continue } i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { sdkhelper.EjectResult(nil, rMsg, out) - f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed") - continue } @@ -222,39 +281,7 @@ f.releaseChn(dtchn) } return + } } -} - -////////////////////////////////////////////////////////////////// -const ( - cacheFrameNum = 3 - trackChnTimeout = time.Duration(10) -) - -func (f *face) cleanChnStat() { - f.chnLock.Lock() - for i := 0; i < f.maxChannel; i++ { - f.ftrackChanStats[i] = false - } - f.chnLock.Unlock() -} - -func (f *face) getAvailableChn() int { - f.chnLock.Lock() - defer f.chnLock.Unlock() - - for i := 0; i < f.maxChannel; i++ { - if f.ftrackChanStats[i] == false { - f.ftrackChanStats[i] = true - return i - } - } - return -1 -} - -func (f *face) releaseChn(chn int) { - f.chnLock.Lock() - f.ftrackChanStats[chn] = false - f.chnLock.Unlock() } -- Gitblit v1.8.0