From 373c820ba3bd14ce3895d13e401aaf1cf62269af Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期六, 11 一月 2020 11:21:39 +0800
Subject: [PATCH] debug split
---
run.go | 161 +++++++++++++++++++++--------------------------------
1 files changed, 65 insertions(+), 96 deletions(-)
diff --git a/run.go b/run.go
index f915794..ede5e89 100644
--- a/run.go
+++ b/run.go
@@ -14,6 +14,7 @@
type face struct {
handle *SDKFace
+ list *sdkhelper.LockList
maxChannel int
ftrackChans map[string]chan protomsg.SdkMessage
@@ -100,6 +101,7 @@
return &face{
handle: handle,
+ list: sdkhelper.NewLockList(maxChan + maxChan/2),
maxChannel: maxChan,
ftrackChans: make(map[string]chan protomsg.SdkMessage, maxChan),
@@ -117,114 +119,52 @@
}
}
+func (f *face) release() {
+ if f.handle != nil {
+ f.handle.Free()
+ }
+}
+
// Run run
func Run(ctx context.Context, i interface{}) {
s := i.(*face)
- const (
- postPull = `_1`
- postPush = `_2`
- )
- ipcRcv := sdkhelper.GetIpcAddress(s.shm, s.id+postPull)
- ipcSnd := sdkhelper.GetIpcAddress(s.shm, s.id+postPush)
- chRcv := make(chan []byte)
- chSnd := make(chan sdkstruct.MsgSDK)
-
- rcver := sdkhelper.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
- snder := sdkhelper.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
- torule := sdkhelper.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
-
- snder.ApplyCallbackFunc(torule.Push)
-
- go rcver.Run(ctx)
- go snder.Run(ctx)
- go torule.Run(ctx)
-
- go s.run(ctx, chRcv, chSnd)
+ chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
+ go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger)
}
-//////////////////////////////////////////////////////////////////
-const (
- cacheFrameNum = 3
- trackChnTimeout = time.Duration(10)
-)
+func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) {
-func (f *face) cleanChnStat() {
- f.chnLock.Lock()
- for i := 0; i < f.maxChannel; i++ {
- f.ftrackChanStats[i] = false
- }
- f.chnLock.Unlock()
-}
+ for _, msg := range msgs {
+ if _, ok := f.ftrackChans[msg.Cid]; ok {
+ f.ftrackChans[msg.Cid] <- msg
+ } else {
-func (f *face) getAvailableChn() int {
- f.chnLock.Lock()
- defer f.chnLock.Unlock()
-
- for i := 0; i < f.maxChannel; i++ {
- if f.ftrackChanStats[i] == false {
- f.ftrackChanStats[i] = true
- return i
- }
- }
- return -1
-}
-
-func (f *face) releaseChn(chn int) {
- f.chnLock.Lock()
- f.ftrackChanStats[chn] = false
- f.chnLock.Unlock()
-}
-
-func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) {
-
- chMsg := make(chan protomsg.SdkMessage)
- go sdkhelper.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
-
- for {
- select {
- case <-ctx.Done():
- f.handle.Free()
- return
- case rMsg := <-chMsg:
- if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
- f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
- sdkhelper.EjectResult(nil, rMsg, out)
+ f.ftrackChans[msg.Cid] = make(chan protomsg.SdkMessage, cacheFrameNum)
+ chn := f.getAvailableChn()
+ if chn < 0 {
+ f.fnLogger("TOO MUCH CHANNEL")
+ sdkhelper.EjectResult(nil, msg, out)
continue
}
+ f.ftrackChannels[msg.Cid] = chn
- if _, ok := f.ftrackChans[rMsg.Cid]; ok {
- f.ftrackChans[rMsg.Cid] <- rMsg
- } else {
-
- f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, cacheFrameNum)
- chn := f.getAvailableChn()
- if chn < 0 {
- f.fnLogger("TOO MUCH CHANNEL")
- sdkhelper.EjectResult(nil, rMsg, out)
- continue
- }
- f.ftrackChannels[rMsg.Cid] = chn
-
- i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
- if i == nil {
- sdkhelper.EjectResult(nil, rMsg, out)
- continue
- }
- // conv to bgr24 and resize
- imgW, imgH := int(i.Width), int(i.Height)
- ret := f.handle.TrackerResize(imgW, imgH, chn)
- f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
- go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn)
- f.ftrackChans[rMsg.Cid] <- rMsg
+ i := sdkhelper.UnpackImage(msg, f.typ, f.fnLogger)
+ if i == nil {
+ sdkhelper.EjectResult(nil, msg, out)
+ continue
}
- default:
- time.Sleep(time.Millisecond * 100)
+ // conv to bgr24 and resize
+ imgW, imgH := int(i.Width), int(i.Height)
+ ret := f.handle.TrackerResize(imgW, imgH, chn)
+ f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
+ go f.detectTrackOneChn(f.ftrackChans[msg.Cid], out, chn)
+ f.ftrackChans[msg.Cid] <- msg
}
}
}
-func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) {
+func (f *face) detectTrackOneChn(in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) {
tm := time.Now()
sc := 0
f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn)
@@ -232,9 +172,6 @@
for {
select {
- case <-ctx.Done():
- return
-
case rMsg := <-in:
if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
@@ -281,7 +218,39 @@
f.releaseChn(dtchn)
}
return
-
}
}
}
+
+//////////////////////////////////////////////////////////////////
+const (
+ cacheFrameNum = 3
+ trackChnTimeout = time.Duration(10)
+)
+
+func (f *face) cleanChnStat() {
+ f.chnLock.Lock()
+ for i := 0; i < f.maxChannel; i++ {
+ f.ftrackChanStats[i] = false
+ }
+ f.chnLock.Unlock()
+}
+
+func (f *face) getAvailableChn() int {
+ f.chnLock.Lock()
+ defer f.chnLock.Unlock()
+
+ for i := 0; i < f.maxChannel; i++ {
+ if f.ftrackChanStats[i] == false {
+ f.ftrackChanStats[i] = true
+ return i
+ }
+ }
+ return -1
+}
+
+func (f *face) releaseChn(chn int) {
+ f.chnLock.Lock()
+ f.ftrackChanStats[chn] = false
+ f.chnLock.Unlock()
+}
--
Gitblit v1.8.0