From 2ff2fb5539ebc07fac8326651c6afc6bd9c374cb Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 13 一月 2020 10:36:36 +0800
Subject: [PATCH] debug
---
run.go | 152 ++++++++++++++++++++++++++------------------------
1 files changed, 80 insertions(+), 72 deletions(-)
diff --git a/run.go b/run.go
index 023e949..c94aeff 100644
--- a/run.go
+++ b/run.go
@@ -14,13 +14,8 @@
type channel struct {
index int
- msgs *sdkhelper.LockList
+ life int
}
-
-//////////////////////////////////////////////////////////////////
-const (
- trackChnTimeout = time.Duration(10)
-)
func (f *face) channelAvailable() int {
f.mapChanLock.Lock()
@@ -41,13 +36,6 @@
return -1
}
-func (f *face) channelRelease(id string) {
- f.mapChanLock.Lock()
- defer f.mapChanLock.Unlock()
-
- delete(f.mapChan, id)
-}
-
func (f *face) channelExist(id string) (*channel, bool) {
f.mapChanLock.Lock()
defer f.mapChanLock.Unlock()
@@ -66,7 +54,7 @@
f.mapChanLock.Lock()
c := &channel{
index: index,
- msgs: sdkhelper.NewLockList(3),
+ life: f.maxLife,
}
f.mapChan[id] = c
f.mapChanLock.Unlock()
@@ -74,11 +62,39 @@
return c
}
+func (f *face) maintainChannel(ctx context.Context, ch <-chan string) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case id := <-ch:
+ f.mapChanLock.Lock()
+ defer f.mapChanLock.Unlock()
+ for k, v := range f.mapChan {
+ v.life--
+ if k == id {
+ v.life = f.maxLife
+ }
+ }
+ for k, v := range f.mapChan {
+ if v.life < 0 {
+ delete(f.mapChan, k)
+ }
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
type face struct {
handle *SDKFace
list *sdkhelper.LockList
- maxChannel int
+ maxChannel int
+
+ maxLife int
+ chLife chan string
mapChan map[string]*channel
mapChanLock sync.Mutex
@@ -163,6 +179,8 @@
handle: handle,
list: sdkhelper.NewLockList(maxChan + maxChan/2),
+ maxLife: trckInterval * 10,
+ chLife: make(chan string, maxChan),
maxChannel: maxChan,
mapChan: make(map[string]*channel, maxChan),
@@ -187,16 +205,24 @@
func Run(ctx context.Context, i interface{}) {
s := i.(*face)
+ go s.maintainChannel(ctx, s.chLife)
+
chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger)
}
func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) {
+ wg := &sync.WaitGroup{}
+
for _, msg := range msgs {
+ f.fnLogger("######Maintain :", msg.Cid)
+ f.chLife <- msg.Cid
+ f.fnLogger("######Maintain Over :", msg.Cid)
+
if c, ok := f.channelExist(msg.Cid); ok {
- f.fnLogger("######recv from :", msg.Cid, " Chan:", c.index)
- c.msgs.Push(msg)
+ wg.Add(1)
+ go f.track(wg, &msg, c.index, out)
} else {
nc := f.channelNew(msg.Cid)
@@ -215,69 +241,51 @@
imgW, imgH := int(i.Width), int(i.Height)
ret := f.handle.TrackerResize(imgW, imgH, nc.index)
f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", nc.index, " wXh: ", imgW, "x", imgH, " result:", ret)
- go f.detectTrackOneChn(nc, out)
- nc.msgs.Push(msg)
+ wg.Add(1)
+ go f.track(wg, &msg, nc.index, out)
}
}
+ wg.Wait()
}
-func (f *face) detectTrackOneChn(c *channel, out chan<- sdkstruct.MsgSDK) {
+func (f *face) track(wg *sync.WaitGroup, msg *protomsg.SdkMessage, tchan int, out chan<- sdkstruct.MsgSDK) {
- dtchn := c.index
- var curCid string
+ defer wg.Done()
- for {
- select {
+ rMsg := *msg
- case <-time.After(trackChnTimeout * time.Second):
- f.fnLogger("######Timeout to get image, curCid:", curCid)
- if curCid != "" {
- f.channelRelease(curCid)
- }
- return
- default:
+ f.fnLogger("######Recv From: ", rMsg.Cid, " Chan: ", tchan)
- elems := c.msgs.Drain()
- var msgs []protomsg.SdkMessage
- for _, v := range elems {
- msgs = append(msgs, v.(protomsg.SdkMessage))
- }
- for _, rMsg := range msgs {
+ if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
+ sdkhelper.EjectResult(nil, rMsg, out)
+ f.fnLogger("Face!!!!!!SkdMessage Invalid: ", tchan)
- if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
- sdkhelper.EjectResult(nil, rMsg, out)
- f.fnLogger("Face!!!!!!SkdMessage Invalid: ", dtchn)
-
- continue
- }
-
- i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
- if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
- sdkhelper.EjectResult(nil, rMsg, out)
- f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed: ", dtchn)
-
- continue
- }
-
- curCid = i.Cid
-
- // conv to bgr24 and resize
- imgW, imgH := int(i.Width), int(i.Height)
-
- f.fnLogger("######run cid:", rMsg.Cid, "chan: ", dtchn)
- count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
- f.fnLogger("######over run cid:", rMsg.Cid, "chan: ", dtchn)
-
- f.fnLogger("######eject cid:", rMsg.Cid, "chan: ", dtchn)
- sdkhelper.EjectResult(data, rMsg, out)
- f.fnLogger("######over eject cid:", rMsg.Cid, "chan: ", dtchn)
-
- var id, name string
- if rMsg.Tasklab != nil {
- id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
- }
- f.fnLogger("Chan: ", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
- }
- }
+ return
}
+
+ i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
+ if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
+ sdkhelper.EjectResult(nil, rMsg, out)
+ f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed: ", tchan)
+
+ return
+ }
+
+ // conv to bgr24 and resize
+ imgW, imgH := int(i.Width), int(i.Height)
+
+ f.fnLogger("######Run cid:", rMsg.Cid, "chan: ", tchan)
+ count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, tchan)
+ f.fnLogger("######Over run cid:", rMsg.Cid, "chan: ", tchan)
+
+ f.fnLogger("######Eject cid:", rMsg.Cid, "chan: ", tchan)
+ sdkhelper.EjectResult(data, rMsg, out)
+ f.fnLogger("######Over eject cid:", rMsg.Cid, "chan: ", tchan)
+
+ var id, name string
+ if rMsg.Tasklab != nil {
+ id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
+ }
+ f.fnLogger("Chan: ", tchan, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
+
}
--
Gitblit v1.8.0