From d85f3edab0d8c495cecd7a81f31a9ead1eb001ac Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 15 一月 2020 09:23:17 +0800 Subject: [PATCH] copy from bgr-2-analysis --- run.go | 41 ++++----- common/recv.go | 46 ++++++++--- common/helper.go | 95 +++++------------------ common/send.go | 31 ++++--- 4 files changed, 91 insertions(+), 122 deletions(-) diff --git a/common/helper.go b/common/helper.go index 0e016c3..78298be 100644 --- a/common/helper.go +++ b/common/helper.go @@ -1,20 +1,22 @@ package common import ( - "context" "encoding/json" "fmt" "io/ioutil" "strconv" - "time" - "basic.com/libgowrapper/sdkstruct.git" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" "github.com/gogo/protobuf/proto" ) const mode = deliver.PushPull + +// MsgRS msg recv and snd +type MsgRS struct { + Msg protomsg.SdkMessage +} // GetIpcAddress get ipc func GetIpcAddress(shm bool, id string) string { @@ -59,109 +61,56 @@ return i } -// UnserilizeProto un -func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) { - for { - select { - case <-ctx.Done(): - return - case d := <-data: - if len(d) < 100 { - continue - } - msg := protomsg.SdkMessage{} - if err := proto.Unmarshal(d, &msg); err != nil { - fn(err, " msg 澶勭悊寮傚父") - continue - } - - out <- msg - - default: - time.Sleep(10 * time.Millisecond) - } - } -} - -// Msg2MsgSDK msg->msgsdk -func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK { - - d, err := proto.Marshal(&msg) - if err != nil { - return nil - } - - index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos) - if index >= count { - return &sdkstruct.MsgSDK{ - MsgData: d, - SdkCount: count, - SdkIndex: index, - SdkDataLen: 0, - } - } - - return &sdkstruct.MsgSDK{ - MsgData: d, - SdkCount: count, - SdkIndex: index, - SdkDataLen: len(d), - } -} - // EjectResult eject -func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) { +func EjectResult(res []byte, msg MsgRS, out chan<- MsgRS) { if res == nil { - s := Msg2MsgSDK(msg) - if s == nil { - return - } - out <- *s + out <- msg + return + } + index := int(msg.Msg.Tasklab.Index) + + if index >= len(msg.Msg.Tasklab.Sdkinfos) { return } - msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res + msg.Msg.Tasklab.Sdkinfos[index].Sdkdata = res - s := Msg2MsgSDK(msg) - if s == nil { - return - } - out <- *s + out <- msg } ///////////////////////////////////////////////////////////// // ValidRemoteMessage valid or not -func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool { - if msg.Tasklab == nil { +func ValidRemoteMessage(msg MsgRS, fnName string, fn func(...interface{})) bool { + if msg.Msg.Tasklab == nil { fn(fnName, " recieve msg nil") return false } - sdkLen := len(msg.Tasklab.Sdkinfos) + sdkLen := len(msg.Msg.Tasklab.Sdkinfos) if sdkLen == 0 { fn(fnName, " has no sdk info") return false } - curIndex := int(msg.Tasklab.Index) + curIndex := int(msg.Msg.Tasklab.Index) if curIndex < 0 || curIndex >= sdkLen { fn(fnName, " tasklab index ", curIndex, " error") return false } - if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName { - fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype) + if msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName { + fn(fnName, " is different from ", msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype) return false } return true } // UnpackImage unpack -func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image { +func UnpackImage(msg MsgRS, fnName string, fn func(...interface{})) *protomsg.Image { // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬 i := &protomsg.Image{} - err := proto.Unmarshal(msg.Data, i) + err := proto.Unmarshal(msg.Msg.Data, i) if err != nil { fn(fnName, " protobuf decode CameraImage error: ", err.Error()) return nil diff --git a/common/recv.go b/common/recv.go index fb31433..21d9b5e 100644 --- a/common/recv.go +++ b/common/recv.go @@ -5,26 +5,50 @@ "time" + "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" + "github.com/gogo/protobuf/proto" ) // Reciever recv from ipc type Reciever struct { ctx context.Context ipcURL string - out chan<- []byte + chMsg chan<- MsgRS shm bool fnLogger func(...interface{}) } // NewReciever new recv -func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { +func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever { return &Reciever{ ipcURL: url, - out: out, + chMsg: chMsg, shm: shm, fnLogger: fn, + } +} + +func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) { + for { + select { + case <-ctx.Done(): + return + default: + d := <-data + if len(d) < 100 { + continue + } + // logo.Infoln(len(d), "reciver鏁版嵁") + msg := protomsg.SdkMessage{} + if err := proto.Unmarshal(d, &msg); err != nil { + r.fnLogger(err, " msg 澶勭悊寮傚父") + continue + } + outMsg := MsgRS{Msg: msg} + r.chMsg <- outMsg + } } } @@ -40,8 +64,9 @@ func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { - // t := time.Now() - // sc := 0 + dataChan := make(chan []byte, 3) + + go r.unserilizeProto(ctx, dataChan) count := 0 @@ -75,7 +100,7 @@ count = 0 r.fnLogger("~~~shm recv image:", len(d)) } - r.out <- d + dataChan <- d } } } else { @@ -87,16 +112,9 @@ count = 0 r.fnLogger("~~~mangos recv image:", len(msg)) } - r.out <- msg + dataChan <- msg } } - - // sc++ - // if sc == 25 { - // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) - // sc = 0 - // t = time.Now() - // } } } diff --git a/common/send.go b/common/send.go index a0e7cc4..4b0a0e6 100644 --- a/common/send.go +++ b/common/send.go @@ -4,14 +4,14 @@ "context" "time" - "basic.com/libgowrapper/sdkstruct.git" "basic.com/valib/deliver.git" + "github.com/gogo/protobuf/proto" ) // Sender decoder ingo type Sender struct { ipcURL string - chMsg <-chan sdkstruct.MsgSDK + chMsg <-chan MsgRS shm bool fn func([]byte, bool) @@ -27,7 +27,7 @@ } // NewSender Sender -func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender { +func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool, fn func(...interface{})) *Sender { // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL) return &Sender{ ipcURL: ipcURL, @@ -62,19 +62,28 @@ return case i := <-s.chMsg: - data <- i.MsgData + d, err := proto.Marshal(&i.Msg) - if int(i.SdkIndex+1) == i.SdkCount { + if err != nil { + s.fnLogger("protobuf encode ipc sender error: ", err) + continue + } + + data <- d + + if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) { if s.fn != nil { - sFlag := true - if i.SdkDataLen < 2 { - sFlag = false + for _, v := range i.Msg.Tasklab.Sdkinfos { + if len(v.Sdkdata) < 2 { + sFlag = false + break + } } - s.fn(i.MsgData, sFlag) - + s.fn(d, sFlag) } } + default: time.Sleep(10 * time.Millisecond) } @@ -82,8 +91,6 @@ } func (s *Sender) run(ctx context.Context, i deliver.Deliver) { - - // go ruleserver.TimeTicker() dataChan := make(chan []byte, 3) go s.serializeProto(ctx, dataChan) diff --git a/run.go b/run.go index 50cf58f..86cc73c 100644 --- a/run.go +++ b/run.go @@ -8,8 +8,6 @@ "face/common" - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/protomsg.git" "basic.com/valib/gogpu.git" ) @@ -19,7 +17,7 @@ handle *SDKFace maxChannel int - ftrackChans map[string]chan protomsg.SdkMessage + ftrackChans map[string]chan common.MsgRS ftrackChannels map[string]int ftrackChanStats []bool chnLock sync.Mutex @@ -139,7 +137,7 @@ handle: handle, maxChannel: maxChan, - ftrackChans: make(map[string]chan protomsg.SdkMessage, maxChan), + ftrackChans: make(map[string]chan common.MsgRS, maxChan), ftrackChannels: make(map[string]int, maxChan), ftrackChanStats: make([]bool, maxChan, maxChan), @@ -173,8 +171,8 @@ ) ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull) ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush) - chRcv := make(chan []byte, s.maxChannel) - chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel) + chRcv := make(chan common.MsgRS, s.maxChannel) + chSnd := make(chan common.MsgRS, s.maxChannel) rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger) snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger) @@ -223,36 +221,33 @@ f.chnLock.Unlock() } -func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) { - - chMsg := make(chan protomsg.SdkMessage, f.maxChannel) - go common.UnserilizeProto(ctx, in, chMsg, f.fnLogger) +func (f *face) run(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS) { for { select { case <-ctx.Done(): f.handle.Free() return - case rMsg := <-chMsg: + case rMsg := <-in: if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) { f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID") common.EjectResult(nil, rMsg, out) continue } - if _, ok := f.ftrackChans[rMsg.Cid]; ok { + if _, ok := f.ftrackChans[rMsg.Msg.Cid]; ok { f.fnLogger("Face Cache Size: ", len(f.ftrackChans)) - f.ftrackChans[rMsg.Cid] <- rMsg + f.ftrackChans[rMsg.Msg.Cid] <- rMsg } else { - f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, f.maxChannel) + f.ftrackChans[rMsg.Msg.Cid] = make(chan common.MsgRS, f.maxChannel) chn := f.getAvailableChn() if chn < 0 { f.fnLogger("TOO MUCH CHANNEL") common.EjectResult(nil, rMsg, out) continue } - f.ftrackChannels[rMsg.Cid] = chn + f.ftrackChannels[rMsg.Msg.Cid] = chn i := common.UnpackImage(rMsg, f.typ, f.fnLogger) if i == nil { @@ -262,9 +257,9 @@ // conv to bgr24 and resize imgW, imgH := int(i.Width), int(i.Height) ret := f.handle.TrackerResize(imgW, imgH, chn) - f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret) - go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn) - f.ftrackChans[rMsg.Cid] <- rMsg + f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret) + go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Msg.Cid], out, chn) + f.ftrackChans[rMsg.Msg.Cid] <- rMsg } default: time.Sleep(time.Millisecond * 100) @@ -272,7 +267,7 @@ } } -func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) { +func (f *face) detectTrackOneChn(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS, dtchn int) { tm := time.Now() sc := 0 f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn) @@ -301,7 +296,7 @@ // conv to bgr24 and resize imgW, imgH := int(i.Width), int(i.Height) - f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Cid) + f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Msg.Cid) count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn) @@ -311,10 +306,10 @@ f.mtxRunning.Unlock() var id, name string - if rMsg.Tasklab != nil { - id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname + if rMsg.Msg.Tasklab != nil { + id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname } - f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) + f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count) sc++ if sc == 25 { -- Gitblit v1.8.0