From d85f3edab0d8c495cecd7a81f31a9ead1eb001ac Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 一月 2020 09:23:17 +0800
Subject: [PATCH] copy from bgr-2-analysis
---
run.go | 41 ++++-----
common/recv.go | 46 ++++++++---
common/helper.go | 95 +++++------------------
common/send.go | 31 ++++---
4 files changed, 91 insertions(+), 122 deletions(-)
diff --git a/common/helper.go b/common/helper.go
index 0e016c3..78298be 100644
--- a/common/helper.go
+++ b/common/helper.go
@@ -1,20 +1,22 @@
package common
import (
- "context"
"encoding/json"
"fmt"
"io/ioutil"
"strconv"
- "time"
- "basic.com/libgowrapper/sdkstruct.git"
"basic.com/pubsub/protomsg.git"
"basic.com/valib/deliver.git"
"github.com/gogo/protobuf/proto"
)
const mode = deliver.PushPull
+
+// MsgRS msg recv and snd
+type MsgRS struct {
+ Msg protomsg.SdkMessage
+}
// GetIpcAddress get ipc
func GetIpcAddress(shm bool, id string) string {
@@ -59,109 +61,56 @@
return i
}
-// UnserilizeProto un
-func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
- for {
- select {
- case <-ctx.Done():
- return
- case d := <-data:
- if len(d) < 100 {
- continue
- }
- msg := protomsg.SdkMessage{}
- if err := proto.Unmarshal(d, &msg); err != nil {
- fn(err, " msg 澶勭悊寮傚父")
- continue
- }
-
- out <- msg
-
- default:
- time.Sleep(10 * time.Millisecond)
- }
- }
-}
-
-// Msg2MsgSDK msg->msgsdk
-func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
-
- d, err := proto.Marshal(&msg)
- if err != nil {
- return nil
- }
-
- index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
- if index >= count {
- return &sdkstruct.MsgSDK{
- MsgData: d,
- SdkCount: count,
- SdkIndex: index,
- SdkDataLen: 0,
- }
- }
-
- return &sdkstruct.MsgSDK{
- MsgData: d,
- SdkCount: count,
- SdkIndex: index,
- SdkDataLen: len(d),
- }
-}
-
// EjectResult eject
-func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
+func EjectResult(res []byte, msg MsgRS, out chan<- MsgRS) {
if res == nil {
- s := Msg2MsgSDK(msg)
- if s == nil {
- return
- }
- out <- *s
+ out <- msg
+ return
+ }
+ index := int(msg.Msg.Tasklab.Index)
+
+ if index >= len(msg.Msg.Tasklab.Sdkinfos) {
return
}
- msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
+ msg.Msg.Tasklab.Sdkinfos[index].Sdkdata = res
- s := Msg2MsgSDK(msg)
- if s == nil {
- return
- }
- out <- *s
+ out <- msg
}
/////////////////////////////////////////////////////////////
// ValidRemoteMessage valid or not
-func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
- if msg.Tasklab == nil {
+func ValidRemoteMessage(msg MsgRS, fnName string, fn func(...interface{})) bool {
+ if msg.Msg.Tasklab == nil {
fn(fnName, " recieve msg nil")
return false
}
- sdkLen := len(msg.Tasklab.Sdkinfos)
+ sdkLen := len(msg.Msg.Tasklab.Sdkinfos)
if sdkLen == 0 {
fn(fnName, " has no sdk info")
return false
}
- curIndex := int(msg.Tasklab.Index)
+ curIndex := int(msg.Msg.Tasklab.Index)
if curIndex < 0 || curIndex >= sdkLen {
fn(fnName, " tasklab index ", curIndex, " error")
return false
}
- if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
- fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
+ if msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
+ fn(fnName, " is different from ", msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype)
return false
}
return true
}
// UnpackImage unpack
-func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
+func UnpackImage(msg MsgRS, fnName string, fn func(...interface{})) *protomsg.Image {
// 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬
i := &protomsg.Image{}
- err := proto.Unmarshal(msg.Data, i)
+ err := proto.Unmarshal(msg.Msg.Data, i)
if err != nil {
fn(fnName, " protobuf decode CameraImage error: ", err.Error())
return nil
diff --git a/common/recv.go b/common/recv.go
index fb31433..21d9b5e 100644
--- a/common/recv.go
+++ b/common/recv.go
@@ -5,26 +5,50 @@
"time"
+ "basic.com/pubsub/protomsg.git"
"basic.com/valib/deliver.git"
+ "github.com/gogo/protobuf/proto"
)
// Reciever recv from ipc
type Reciever struct {
ctx context.Context
ipcURL string
- out chan<- []byte
+ chMsg chan<- MsgRS
shm bool
fnLogger func(...interface{})
}
// NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever {
return &Reciever{
ipcURL: url,
- out: out,
+ chMsg: chMsg,
shm: shm,
fnLogger: fn,
+ }
+}
+
+func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ d := <-data
+ if len(d) < 100 {
+ continue
+ }
+ // logo.Infoln(len(d), "reciver鏁版嵁")
+ msg := protomsg.SdkMessage{}
+ if err := proto.Unmarshal(d, &msg); err != nil {
+ r.fnLogger(err, " msg 澶勭悊寮傚父")
+ continue
+ }
+ outMsg := MsgRS{Msg: msg}
+ r.chMsg <- outMsg
+ }
}
}
@@ -40,8 +64,9 @@
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
- // t := time.Now()
- // sc := 0
+ dataChan := make(chan []byte, 3)
+
+ go r.unserilizeProto(ctx, dataChan)
count := 0
@@ -75,7 +100,7 @@
count = 0
r.fnLogger("~~~shm recv image:", len(d))
}
- r.out <- d
+ dataChan <- d
}
}
} else {
@@ -87,16 +112,9 @@
count = 0
r.fnLogger("~~~mangos recv image:", len(msg))
}
- r.out <- msg
+ dataChan <- msg
}
}
-
- // sc++
- // if sc == 25 {
- // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
- // sc = 0
- // t = time.Now()
- // }
}
}
diff --git a/common/send.go b/common/send.go
index a0e7cc4..4b0a0e6 100644
--- a/common/send.go
+++ b/common/send.go
@@ -4,14 +4,14 @@
"context"
"time"
- "basic.com/libgowrapper/sdkstruct.git"
"basic.com/valib/deliver.git"
+ "github.com/gogo/protobuf/proto"
)
// Sender decoder ingo
type Sender struct {
ipcURL string
- chMsg <-chan sdkstruct.MsgSDK
+ chMsg <-chan MsgRS
shm bool
fn func([]byte, bool)
@@ -27,7 +27,7 @@
}
// NewSender Sender
-func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
+func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool, fn func(...interface{})) *Sender {
// logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
return &Sender{
ipcURL: ipcURL,
@@ -62,19 +62,28 @@
return
case i := <-s.chMsg:
- data <- i.MsgData
+ d, err := proto.Marshal(&i.Msg)
- if int(i.SdkIndex+1) == i.SdkCount {
+ if err != nil {
+ s.fnLogger("protobuf encode ipc sender error: ", err)
+ continue
+ }
+
+ data <- d
+
+ if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) {
if s.fn != nil {
-
sFlag := true
- if i.SdkDataLen < 2 {
- sFlag = false
+ for _, v := range i.Msg.Tasklab.Sdkinfos {
+ if len(v.Sdkdata) < 2 {
+ sFlag = false
+ break
+ }
}
- s.fn(i.MsgData, sFlag)
-
+ s.fn(d, sFlag)
}
}
+
default:
time.Sleep(10 * time.Millisecond)
}
@@ -82,8 +91,6 @@
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
-
- // go ruleserver.TimeTicker()
dataChan := make(chan []byte, 3)
go s.serializeProto(ctx, dataChan)
diff --git a/run.go b/run.go
index 50cf58f..86cc73c 100644
--- a/run.go
+++ b/run.go
@@ -8,8 +8,6 @@
"face/common"
- "basic.com/libgowrapper/sdkstruct.git"
- "basic.com/pubsub/protomsg.git"
"basic.com/valib/gogpu.git"
)
@@ -19,7 +17,7 @@
handle *SDKFace
maxChannel int
- ftrackChans map[string]chan protomsg.SdkMessage
+ ftrackChans map[string]chan common.MsgRS
ftrackChannels map[string]int
ftrackChanStats []bool
chnLock sync.Mutex
@@ -139,7 +137,7 @@
handle: handle,
maxChannel: maxChan,
- ftrackChans: make(map[string]chan protomsg.SdkMessage, maxChan),
+ ftrackChans: make(map[string]chan common.MsgRS, maxChan),
ftrackChannels: make(map[string]int, maxChan),
ftrackChanStats: make([]bool, maxChan, maxChan),
@@ -173,8 +171,8 @@
)
ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
- chRcv := make(chan []byte, s.maxChannel)
- chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel)
+ chRcv := make(chan common.MsgRS, s.maxChannel)
+ chSnd := make(chan common.MsgRS, s.maxChannel)
rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
@@ -223,36 +221,33 @@
f.chnLock.Unlock()
}
-func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) {
-
- chMsg := make(chan protomsg.SdkMessage, f.maxChannel)
- go common.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
+func (f *face) run(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS) {
for {
select {
case <-ctx.Done():
f.handle.Free()
return
- case rMsg := <-chMsg:
+ case rMsg := <-in:
if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
common.EjectResult(nil, rMsg, out)
continue
}
- if _, ok := f.ftrackChans[rMsg.Cid]; ok {
+ if _, ok := f.ftrackChans[rMsg.Msg.Cid]; ok {
f.fnLogger("Face Cache Size: ", len(f.ftrackChans))
- f.ftrackChans[rMsg.Cid] <- rMsg
+ f.ftrackChans[rMsg.Msg.Cid] <- rMsg
} else {
- f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, f.maxChannel)
+ f.ftrackChans[rMsg.Msg.Cid] = make(chan common.MsgRS, f.maxChannel)
chn := f.getAvailableChn()
if chn < 0 {
f.fnLogger("TOO MUCH CHANNEL")
common.EjectResult(nil, rMsg, out)
continue
}
- f.ftrackChannels[rMsg.Cid] = chn
+ f.ftrackChannels[rMsg.Msg.Cid] = chn
i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
if i == nil {
@@ -262,9 +257,9 @@
// conv to bgr24 and resize
imgW, imgH := int(i.Width), int(i.Height)
ret := f.handle.TrackerResize(imgW, imgH, chn)
- f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
- go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn)
- f.ftrackChans[rMsg.Cid] <- rMsg
+ f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
+ go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Msg.Cid], out, chn)
+ f.ftrackChans[rMsg.Msg.Cid] <- rMsg
}
default:
time.Sleep(time.Millisecond * 100)
@@ -272,7 +267,7 @@
}
}
-func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) {
+func (f *face) detectTrackOneChn(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS, dtchn int) {
tm := time.Now()
sc := 0
f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn)
@@ -301,7 +296,7 @@
// conv to bgr24 and resize
imgW, imgH := int(i.Width), int(i.Height)
- f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Cid)
+ f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Msg.Cid)
count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
@@ -311,10 +306,10 @@
f.mtxRunning.Unlock()
var id, name string
- if rMsg.Tasklab != nil {
- id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
+ if rMsg.Msg.Tasklab != nil {
+ id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname
}
- f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
+ f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
sc++
if sc == 25 {
--
Gitblit v1.8.0