From 51f95d190fb7950638a2896cdd1fef7c6ed2b08f Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 13 一月 2020 09:30:03 +0800 Subject: [PATCH] debug --- run.go | 292 ++++++++++++++++++++++++++++------------------------------ 1 files changed, 142 insertions(+), 150 deletions(-) diff --git a/run.go b/run.go index f7e9e12..8a03e1e 100644 --- a/run.go +++ b/run.go @@ -12,14 +12,75 @@ "basic.com/valib/gogpu.git" ) +type channel struct { + index int + msgs *sdkhelper.LockList +} + +////////////////////////////////////////////////////////////////// +const ( + trackChnTimeout = time.Duration(10) +) + +func (f *face) channelAvailable() int { + f.mapChanLock.Lock() + defer f.mapChanLock.Unlock() + + for i := 0; i < f.maxChannel; i++ { + found := false + for _, v := range f.mapChan { + if v.index == i { + found = true + break + } + } + if !found { + return i + } + } + return -1 +} + +func (f *face) channelRelease(id string) { + f.mapChanLock.Lock() + defer f.mapChanLock.Unlock() + + delete(f.mapChan, id) +} + +func (f *face) channelExist(id string) (*channel, bool) { + f.mapChanLock.Lock() + defer f.mapChanLock.Unlock() + + c, ok := f.mapChan[id] + return c, ok +} + +func (f *face) channelNew(id string) *channel { + + index := f.channelAvailable() + if index < 0 { + return nil + } + + f.mapChanLock.Lock() + c := &channel{ + index: index, + msgs: sdkhelper.NewLockList(3), + } + f.mapChan[id] = c + f.mapChanLock.Unlock() + + return c +} + type face struct { handle *SDKFace + list *sdkhelper.LockList - maxChannel int - ftrackChans map[string]chan protomsg.SdkMessage - ftrackChannels map[string]int - ftrackChanStats []bool - chnLock sync.Mutex + maxChannel int + mapChan map[string]*channel + mapChanLock sync.Mutex fnLogger func(...interface{}) @@ -28,11 +89,11 @@ shm bool ipc2Rule string ruleMsgMaxCacheSize int - reserved map[string]string + reserved map[string]interface{} } // Create create sdk -func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{} { +func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} { handle := NewSDK(fn) if handle == nil { @@ -100,11 +161,10 @@ return &face{ handle: handle, + list: sdkhelper.NewLockList(maxChan + maxChan/2), - maxChannel: maxChan, - ftrackChans: make(map[string]chan protomsg.SdkMessage, maxChan), - ftrackChannels: make(map[string]int, maxChan), - ftrackChanStats: make([]bool, maxChan, maxChan), + maxChannel: maxChan, + mapChan: make(map[string]*channel, maxChan), fnLogger: fn, @@ -117,171 +177,103 @@ } } +func (f *face) release() { + if f.handle != nil { + f.handle.Free() + } +} + // Run run func Run(ctx context.Context, i interface{}) { s := i.(*face) - const ( - postPull = `_1` - postPush = `_2` - ) - ipcRcv := sdkhelper.GetIpcAddress(s.shm, s.id+postPull) - ipcSnd := sdkhelper.GetIpcAddress(s.shm, s.id+postPush) - chRcv := make(chan []byte) - chSnd := make(chan sdkstruct.MsgSDK) - - rcver := sdkhelper.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger) - snder := sdkhelper.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger) - torule := sdkhelper.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger) - - snder.ApplyCallbackFunc(torule.Push) - - go rcver.Run(ctx) - go snder.Run(ctx) - go torule.Run(ctx) - - go s.run(ctx, chRcv, chSnd) + chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger) + go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger) } -////////////////////////////////////////////////////////////////// -const ( - cacheFrameNum = 3 - trackChnTimeout = time.Duration(10) -) +func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) { -func (f *face) cleanChnStat() { - f.chnLock.Lock() - for i := 0; i < f.maxChannel; i++ { - f.ftrackChanStats[i] = false - } - f.chnLock.Unlock() -} + for _, msg := range msgs { + if c, ok := f.channelExist(msg.Cid); ok { + c.msgs.Push(msg) + } else { -func (f *face) getAvailableChn() int { - f.chnLock.Lock() - defer f.chnLock.Unlock() - - for i := 0; i < f.maxChannel; i++ { - if f.ftrackChanStats[i] == false { - f.ftrackChanStats[i] = true - return i - } - } - return -1 -} - -func (f *face) releaseChn(chn int) { - f.chnLock.Lock() - f.ftrackChanStats[chn] = false - f.chnLock.Unlock() -} - -func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) { - - chMsg := make(chan protomsg.SdkMessage) - go sdkhelper.UnserilizeProto(ctx, in, chMsg, f.fnLogger) - - for { - select { - case <-ctx.Done(): - f.handle.Free() - return - case rMsg := <-chMsg: - if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { - f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID") - sdkhelper.EjectResult(nil, rMsg, out) + nc := f.channelNew(msg.Cid) + if nc == nil { + f.fnLogger("TOO MUCH CHANNEL") + sdkhelper.EjectResult(nil, msg, out) continue } - if _, ok := f.ftrackChans[rMsg.Cid]; ok { - f.ftrackChans[rMsg.Cid] <- rMsg - } else { - - f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, cacheFrameNum) - chn := f.getAvailableChn() - if chn < 0 { - f.fnLogger("TOO MUCH CHANNEL") - sdkhelper.EjectResult(nil, rMsg, out) - continue - } - f.ftrackChannels[rMsg.Cid] = chn - - i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) - if i == nil { - sdkhelper.EjectResult(nil, rMsg, out) - continue - } - // conv to bgr24 and resize - imgW, imgH := int(i.Width), int(i.Height) - ret := f.handle.TrackerResize(imgW, imgH, chn) - f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret) - go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn) - f.ftrackChans[rMsg.Cid] <- rMsg + i := sdkhelper.UnpackImage(msg, f.typ, f.fnLogger) + if i == nil { + sdkhelper.EjectResult(nil, msg, out) + continue } - default: - time.Sleep(time.Millisecond * 100) + // conv to bgr24 and resize + imgW, imgH := int(i.Width), int(i.Height) + ret := f.handle.TrackerResize(imgW, imgH, nc.index) + f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", nc.index, " wXh: ", imgW, "x", imgH, " result:", ret) + go f.detectTrackOneChn(nc, out) + nc.msgs.Push(msg) } } } -func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) { - tm := time.Now() - sc := 0 - f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn) +func (f *face) detectTrackOneChn(c *channel, out chan<- sdkstruct.MsgSDK) { + + dtchn := c.index var curCid string for { select { - case <-ctx.Done(): - return - case rMsg := <-in: - - if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { - sdkhelper.EjectResult(nil, rMsg, out) - continue - } - - i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) - if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { - sdkhelper.EjectResult(nil, rMsg, out) - continue - } - - curCid = i.Cid - - // conv to bgr24 and resize - imgW, imgH := int(i.Width), int(i.Height) - - count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn) - - sdkhelper.EjectResult(data, rMsg, out) - var id, name string - if rMsg.Tasklab != nil { - id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname - } - f.fnLogger("CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) - - sc++ - if sc == 25 { - f.fnLogger("CHAN:%d, FACE RUN 25 FRAME USE TIME: ", dtchn, time.Since(tm)) - sc = 0 - tm = time.Now() - } - - if time.Since(tm) > time.Second { - f.fnLogger("CHAN: ", dtchn, " FACE RUN ", sc, " FRAME USE TIME: ", time.Since(tm)) - sc = 0 - tm = time.Now() - } case <-time.After(trackChnTimeout * time.Second): - f.fnLogger("Timeout to get image, curCid:", curCid) + f.fnLogger("######Timeout to get image, curCid:", curCid, " Chan:", dtchn) if curCid != "" { - delete(f.ftrackChans, curCid) - f.releaseChn(dtchn) + f.channelRelease(curCid) } return + default: + // f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn) + elems := c.msgs.Drain() + var msgs []protomsg.SdkMessage + for _, v := range elems { + msgs = append(msgs, v.(protomsg.SdkMessage)) + } + for _, rMsg := range msgs { + + if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { + sdkhelper.EjectResult(nil, rMsg, out) + f.fnLogger("Face!!!!!!SkdMessage Invalid: ", dtchn) + + continue + } + + i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger) + if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { + sdkhelper.EjectResult(nil, rMsg, out) + f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed: ", dtchn) + + continue + } + + curCid = i.Cid + + // conv to bgr24 and resize + imgW, imgH := int(i.Width), int(i.Height) + + count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn) + + sdkhelper.EjectResult(data, rMsg, out) + + var id, name string + if rMsg.Tasklab != nil { + id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname + } + // f.fnLogger("Chan: ", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) + } } } } -- Gitblit v1.8.0