From 2ff2fb5539ebc07fac8326651c6afc6bd9c374cb Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 13 一月 2020 10:36:36 +0800 Subject: [PATCH] debug --- run.go | 152 ++++++++++++++++++++++++++------------------------ 1 files changed, 80 insertions(+), 72 deletions(-) diff --git a/run.go b/run.go index 023e949..c94aeff 100644 --- a/run.go +++ b/run.go @@ -14,13 +14,8 @@ type channel struct { index int - msgs *sdkhelper.LockList + life int } - -////////////////////////////////////////////////////////////////// -const ( - trackChnTimeout = time.Duration(10) -) func (f *face) channelAvailable() int { f.mapChanLock.Lock() @@ -41,13 +36,6 @@ return -1 } -func (f *face) channelRelease(id string) { - f.mapChanLock.Lock() - defer f.mapChanLock.Unlock() - - delete(f.mapChan, id) -} - func (f *face) channelExist(id string) (*channel, bool) { f.mapChanLock.Lock() defer f.mapChanLock.Unlock() @@ -66,7 +54,7 @@ f.mapChanLock.Lock() c := &channel{ index: index, - msgs: sdkhelper.NewLockList(3), + life: f.maxLife, } f.mapChan[id] = c f.mapChanLock.Unlock() @@ -74,11 +62,39 @@ return c } +func (f *face) maintainChannel(ctx context.Context, ch <-chan string) { + for { + select { + case <-ctx.Done(): + return + case id := <-ch: + f.mapChanLock.Lock() + defer f.mapChanLock.Unlock() + for k, v := range f.mapChan { + v.life-- + if k == id { + v.life = f.maxLife + } + } + for k, v := range f.mapChan { + if v.life < 0 { + delete(f.mapChan, k) + } + } + default: + time.Sleep(10 * time.Millisecond) + } + } +} + type face struct { handle *SDKFace list *sdkhelper.LockList - maxChannel int + maxChannel int + + maxLife int + chLife chan string mapChan map[string]*channel mapChanLock sync.Mutex @@ -163,6 +179,8 @@ handle: handle, list: sdkhelper.NewLockList(maxChan + maxChan/2), + maxLife: trckInterval * 10, + chLife: make(chan string, maxChan), maxChannel: maxChan, mapChan: make(map[string]*channel, maxChan), @@ -187,16 +205,24 @@ func Run(ctx context.Context, i interface{}) { s := i.(*face) + go s.maintainChannel(ctx, s.chLife) + chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger) go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger) } func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) { + wg := &sync.WaitGroup{} + for _, msg := range msgs { + f.fnLogger("######Maintain :", msg.Cid) + f.chLife <- msg.Cid + f.fnLogger("######Maintain Over :", msg.Cid) + if c, ok := f.channelExist(msg.Cid); ok { - f.fnLogger("######recv from :", msg.Cid, " Chan:", c.index) - c.msgs.Push(msg) + wg.Add(1) + go f.track(wg, &msg, c.index, out) } else { nc := f.channelNew(msg.Cid) @@ -215,69 +241,51 @@ imgW, imgH := int(i.Width), int(i.Height) ret := f.handle.TrackerResize(imgW, imgH, nc.index) f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", nc.index, " wXh: ", imgW, "x", imgH, " result:", ret) - go f.detectTrackOneChn(nc, out) - nc.msgs.Push(msg) + wg.Add(1) + go f.track(wg, &msg, nc.index, out) } } + wg.Wait() } -func (f *face) detectTrackOneChn(c *channel, out chan<- sdkstruct.MsgSDK) { +func (f *face) track(wg *sync.WaitGroup, msg *protomsg.SdkMessage, tchan int, out chan<- sdkstruct.MsgSDK) { - dtchn := c.index - var curCid string + defer wg.Done() - for { - select { + rMsg := *msg - case <-time.After(trackChnTimeout * time.Second): - f.fnLogger("######Timeout to get image, curCid:", curCid) - if curCid != "" { - f.channelRelease(curCid) - } - return - default: + f.fnLogger("######Recv From: ", rMsg.Cid, " Chan: ", tchan) - elems := c.msgs.Drain() - var msgs []protomsg.SdkMessage - for _, v := range elems { - msgs = append(msgs, v.(protomsg.SdkMessage)) - } - for _, rMsg := range msgs { + if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { + sdkhelper.EjectResult(nil, rMsg, out) + f.fnLogger("Face!!!!!!SkdMessage Invalid: ", tchan) - if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { - sdkhelper.EjectResult(nil, rMsg, out) - f.fnLogger("Face!!!!!!SkdMessage Invalid: ", dtchn) - - continue - } - - i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) - if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { - sdkhelper.EjectResult(nil, rMsg, out) - f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed: ", dtchn) - - continue - } - - curCid = i.Cid - - // conv to bgr24 and resize - imgW, imgH := int(i.Width), int(i.Height) - - f.fnLogger("######run cid:", rMsg.Cid, "chan: ", dtchn) - count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn) - f.fnLogger("######over run cid:", rMsg.Cid, "chan: ", dtchn) - - f.fnLogger("######eject cid:", rMsg.Cid, "chan: ", dtchn) - sdkhelper.EjectResult(data, rMsg, out) - f.fnLogger("######over eject cid:", rMsg.Cid, "chan: ", dtchn) - - var id, name string - if rMsg.Tasklab != nil { - id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname - } - f.fnLogger("Chan: ", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) - } - } + return } + + i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) + if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { + sdkhelper.EjectResult(nil, rMsg, out) + f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed: ", tchan) + + return + } + + // conv to bgr24 and resize + imgW, imgH := int(i.Width), int(i.Height) + + f.fnLogger("######Run cid:", rMsg.Cid, "chan: ", tchan) + count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, tchan) + f.fnLogger("######Over run cid:", rMsg.Cid, "chan: ", tchan) + + f.fnLogger("######Eject cid:", rMsg.Cid, "chan: ", tchan) + sdkhelper.EjectResult(data, rMsg, out) + f.fnLogger("######Over eject cid:", rMsg.Cid, "chan: ", tchan) + + var id, name string + if rMsg.Tasklab != nil { + id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname + } + f.fnLogger("Chan: ", tchan, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) + } -- Gitblit v1.8.0