From d85f3edab0d8c495cecd7a81f31a9ead1eb001ac Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 一月 2020 09:23:17 +0800
Subject: [PATCH] copy from bgr-2-analysis

---
 run.go           |   41 ++++-----
 common/recv.go   |   46 ++++++++---
 common/helper.go |   95 +++++------------------
 common/send.go   |   31 ++++---
 4 files changed, 91 insertions(+), 122 deletions(-)

diff --git a/common/helper.go b/common/helper.go
index 0e016c3..78298be 100644
--- a/common/helper.go
+++ b/common/helper.go
@@ -1,20 +1,22 @@
 package common
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"strconv"
-	"time"
 
-	"basic.com/libgowrapper/sdkstruct.git"
 	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/deliver.git"
 	"github.com/gogo/protobuf/proto"
 )
 
 const mode = deliver.PushPull
+
+// MsgRS msg recv and snd
+type MsgRS struct {
+	Msg protomsg.SdkMessage
+}
 
 // GetIpcAddress get ipc
 func GetIpcAddress(shm bool, id string) string {
@@ -59,109 +61,56 @@
 	return i
 }
 
-// UnserilizeProto un
-func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case d := <-data:
-			if len(d) < 100 {
-				continue
-			}
-			msg := protomsg.SdkMessage{}
-			if err := proto.Unmarshal(d, &msg); err != nil {
-				fn(err, " msg 澶勭悊寮傚父")
-				continue
-			}
-
-			out <- msg
-
-		default:
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-}
-
-// Msg2MsgSDK msg->msgsdk
-func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
-
-	d, err := proto.Marshal(&msg)
-	if err != nil {
-		return nil
-	}
-
-	index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
-	if index >= count {
-		return &sdkstruct.MsgSDK{
-			MsgData:    d,
-			SdkCount:   count,
-			SdkIndex:   index,
-			SdkDataLen: 0,
-		}
-	}
-
-	return &sdkstruct.MsgSDK{
-		MsgData:    d,
-		SdkCount:   count,
-		SdkIndex:   index,
-		SdkDataLen: len(d),
-	}
-}
-
 // EjectResult eject
-func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
+func EjectResult(res []byte, msg MsgRS, out chan<- MsgRS) {
 
 	if res == nil {
-		s := Msg2MsgSDK(msg)
-		if s == nil {
-			return
-		}
-		out <- *s
+		out <- msg
+		return
+	}
+	index := int(msg.Msg.Tasklab.Index)
+
+	if index >= len(msg.Msg.Tasklab.Sdkinfos) {
 		return
 	}
 
-	msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
+	msg.Msg.Tasklab.Sdkinfos[index].Sdkdata = res
 
-	s := Msg2MsgSDK(msg)
-	if s == nil {
-		return
-	}
-	out <- *s
+	out <- msg
 }
 
 /////////////////////////////////////////////////////////////
 
 // ValidRemoteMessage valid or not
-func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
-	if msg.Tasklab == nil {
+func ValidRemoteMessage(msg MsgRS, fnName string, fn func(...interface{})) bool {
+	if msg.Msg.Tasklab == nil {
 		fn(fnName, " recieve msg nil")
 		return false
 	}
 
-	sdkLen := len(msg.Tasklab.Sdkinfos)
+	sdkLen := len(msg.Msg.Tasklab.Sdkinfos)
 	if sdkLen == 0 {
 		fn(fnName, " has no sdk info")
 		return false
 	}
 
-	curIndex := int(msg.Tasklab.Index)
+	curIndex := int(msg.Msg.Tasklab.Index)
 	if curIndex < 0 || curIndex >= sdkLen {
 		fn(fnName, " tasklab index ", curIndex, " error")
 		return false
 	}
-	if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
-		fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
+	if msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
+		fn(fnName, " is different from ", msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype)
 		return false
 	}
 	return true
 }
 
 // UnpackImage unpack
-func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
+func UnpackImage(msg MsgRS, fnName string, fn func(...interface{})) *protomsg.Image {
 	// 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬
 	i := &protomsg.Image{}
-	err := proto.Unmarshal(msg.Data, i)
+	err := proto.Unmarshal(msg.Msg.Data, i)
 	if err != nil {
 		fn(fnName, " protobuf decode CameraImage error: ", err.Error())
 		return nil
diff --git a/common/recv.go b/common/recv.go
index fb31433..21d9b5e 100644
--- a/common/recv.go
+++ b/common/recv.go
@@ -5,26 +5,50 @@
 
 	"time"
 
+	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/deliver.git"
+	"github.com/gogo/protobuf/proto"
 )
 
 // Reciever recv from ipc
 type Reciever struct {
 	ctx    context.Context
 	ipcURL string
-	out    chan<- []byte
+	chMsg  chan<- MsgRS
 
 	shm      bool
 	fnLogger func(...interface{})
 }
 
 // NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever {
 	return &Reciever{
 		ipcURL:   url,
-		out:      out,
+		chMsg:    chMsg,
 		shm:      shm,
 		fnLogger: fn,
+	}
+}
+
+func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			d := <-data
+			if len(d) < 100 {
+				continue
+			}
+			// logo.Infoln(len(d), "reciver鏁版嵁")
+			msg := protomsg.SdkMessage{}
+			if err := proto.Unmarshal(d, &msg); err != nil {
+				r.fnLogger(err, " msg 澶勭悊寮傚父")
+				continue
+			}
+			outMsg := MsgRS{Msg: msg}
+			r.chMsg <- outMsg
+		}
 	}
 }
 
@@ -40,8 +64,9 @@
 
 func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
 
-	// t := time.Now()
-	// sc := 0
+	dataChan := make(chan []byte, 3)
+
+	go r.unserilizeProto(ctx, dataChan)
 
 	count := 0
 
@@ -75,7 +100,7 @@
 							count = 0
 							r.fnLogger("~~~shm recv image:", len(d))
 						}
-						r.out <- d
+						dataChan <- d
 					}
 				}
 			} else {
@@ -87,16 +112,9 @@
 						count = 0
 						r.fnLogger("~~~mangos recv image:", len(msg))
 					}
-					r.out <- msg
+					dataChan <- msg
 				}
 			}
-
-			// sc++
-			// if sc == 25 {
-			// 	logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
-			// 	sc = 0
-			// 	t = time.Now()
-			// }
 
 		}
 	}
diff --git a/common/send.go b/common/send.go
index a0e7cc4..4b0a0e6 100644
--- a/common/send.go
+++ b/common/send.go
@@ -4,14 +4,14 @@
 	"context"
 	"time"
 
-	"basic.com/libgowrapper/sdkstruct.git"
 	"basic.com/valib/deliver.git"
+	"github.com/gogo/protobuf/proto"
 )
 
 // Sender decoder ingo
 type Sender struct {
 	ipcURL string
-	chMsg  <-chan sdkstruct.MsgSDK
+	chMsg  <-chan MsgRS
 	shm    bool
 	fn     func([]byte, bool)
 
@@ -27,7 +27,7 @@
 }
 
 // NewSender Sender
-func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
+func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool, fn func(...interface{})) *Sender {
 	// logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
 	return &Sender{
 		ipcURL:   ipcURL,
@@ -62,19 +62,28 @@
 			return
 		case i := <-s.chMsg:
 
-			data <- i.MsgData
+			d, err := proto.Marshal(&i.Msg)
 
-			if int(i.SdkIndex+1) == i.SdkCount {
+			if err != nil {
+				s.fnLogger("protobuf encode ipc sender error: ", err)
+				continue
+			}
+
+			data <- d
+
+			if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) {
 				if s.fn != nil {
-
 					sFlag := true
-					if i.SdkDataLen < 2 {
-						sFlag = false
+					for _, v := range i.Msg.Tasklab.Sdkinfos {
+						if len(v.Sdkdata) < 2 {
+							sFlag = false
+							break
+						}
 					}
-					s.fn(i.MsgData, sFlag)
-
+					s.fn(d, sFlag)
 				}
 			}
+
 		default:
 			time.Sleep(10 * time.Millisecond)
 		}
@@ -82,8 +91,6 @@
 }
 
 func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
-
-	// go ruleserver.TimeTicker()
 
 	dataChan := make(chan []byte, 3)
 	go s.serializeProto(ctx, dataChan)
diff --git a/run.go b/run.go
index 50cf58f..86cc73c 100644
--- a/run.go
+++ b/run.go
@@ -8,8 +8,6 @@
 
 	"face/common"
 
-	"basic.com/libgowrapper/sdkstruct.git"
-	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/gogpu.git"
 )
 
@@ -19,7 +17,7 @@
 	handle *SDKFace
 
 	maxChannel      int
-	ftrackChans     map[string]chan protomsg.SdkMessage
+	ftrackChans     map[string]chan common.MsgRS
 	ftrackChannels  map[string]int
 	ftrackChanStats []bool
 	chnLock         sync.Mutex
@@ -139,7 +137,7 @@
 		handle: handle,
 
 		maxChannel:      maxChan,
-		ftrackChans:     make(map[string]chan protomsg.SdkMessage, maxChan),
+		ftrackChans:     make(map[string]chan common.MsgRS, maxChan),
 		ftrackChannels:  make(map[string]int, maxChan),
 		ftrackChanStats: make([]bool, maxChan, maxChan),
 
@@ -173,8 +171,8 @@
 	)
 	ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
 	ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
-	chRcv := make(chan []byte, s.maxChannel)
-	chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel)
+	chRcv := make(chan common.MsgRS, s.maxChannel)
+	chSnd := make(chan common.MsgRS, s.maxChannel)
 
 	rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
 	snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
@@ -223,36 +221,33 @@
 	f.chnLock.Unlock()
 }
 
-func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) {
-
-	chMsg := make(chan protomsg.SdkMessage, f.maxChannel)
-	go common.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
+func (f *face) run(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS) {
 
 	for {
 		select {
 		case <-ctx.Done():
 			f.handle.Free()
 			return
-		case rMsg := <-chMsg:
+		case rMsg := <-in:
 			if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
 				f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
 				common.EjectResult(nil, rMsg, out)
 				continue
 			}
 
-			if _, ok := f.ftrackChans[rMsg.Cid]; ok {
+			if _, ok := f.ftrackChans[rMsg.Msg.Cid]; ok {
 				f.fnLogger("Face Cache Size: ", len(f.ftrackChans))
-				f.ftrackChans[rMsg.Cid] <- rMsg
+				f.ftrackChans[rMsg.Msg.Cid] <- rMsg
 			} else {
 
-				f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, f.maxChannel)
+				f.ftrackChans[rMsg.Msg.Cid] = make(chan common.MsgRS, f.maxChannel)
 				chn := f.getAvailableChn()
 				if chn < 0 {
 					f.fnLogger("TOO MUCH CHANNEL")
 					common.EjectResult(nil, rMsg, out)
 					continue
 				}
-				f.ftrackChannels[rMsg.Cid] = chn
+				f.ftrackChannels[rMsg.Msg.Cid] = chn
 
 				i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
 				if i == nil {
@@ -262,9 +257,9 @@
 				// conv to bgr24 and resize
 				imgW, imgH := int(i.Width), int(i.Height)
 				ret := f.handle.TrackerResize(imgW, imgH, chn)
-				f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
-				go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn)
-				f.ftrackChans[rMsg.Cid] <- rMsg
+				f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
+				go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Msg.Cid], out, chn)
+				f.ftrackChans[rMsg.Msg.Cid] <- rMsg
 			}
 		default:
 			time.Sleep(time.Millisecond * 100)
@@ -272,7 +267,7 @@
 	}
 }
 
-func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) {
+func (f *face) detectTrackOneChn(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS, dtchn int) {
 	tm := time.Now()
 	sc := 0
 	f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn)
@@ -301,7 +296,7 @@
 			// conv to bgr24 and resize
 			imgW, imgH := int(i.Width), int(i.Height)
 
-			f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Cid)
+			f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Msg.Cid)
 
 			count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
 
@@ -311,10 +306,10 @@
 			f.mtxRunning.Unlock()
 
 			var id, name string
-			if rMsg.Tasklab != nil {
-				id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
+			if rMsg.Msg.Tasklab != nil {
+				id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname
 			}
-			f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
+			f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
 
 			sc++
 			if sc == 25 {

--
Gitblit v1.8.0