From db20b766e2020950ed64a89065c8599a2ad69ff2 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 14 一月 2020 18:01:12 +0800
Subject: [PATCH] update
---
run.go | 42 +-
go.sum | 2
common/recv.go | 116 ++++++++++
common/torule.go | 128 +++++++++++
common/helper.go | 174 +++++++++++++++
go.mod | 3
common/send.go | 141 ++++++++++++
7 files changed, 581 insertions(+), 25 deletions(-)
diff --git a/common/helper.go b/common/helper.go
new file mode 100644
index 0000000..0e016c3
--- /dev/null
+++ b/common/helper.go
@@ -0,0 +1,174 @@
+package common
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "strconv"
+ "time"
+
+ "basic.com/libgowrapper/sdkstruct.git"
+ "basic.com/pubsub/protomsg.git"
+ "basic.com/valib/deliver.git"
+ "github.com/gogo/protobuf/proto"
+)
+
+const mode = deliver.PushPull
+
+// GetIpcAddress get ipc
+func GetIpcAddress(shm bool, id string) string {
+ if shm {
+ return id
+ }
+ return `ipc:///tmp/` + id + `.ipc`
+}
+
+// SubConfig sub
+type SubConfig struct {
+ SoFile string `json:"so_file_path"`
+ Env string `json:"runtime"`
+ Param map[string]string `json:"param"`
+}
+
+// SdkConfig sdk
+type SdkConfig struct {
+ SoFile string `json:"so_file_path"`
+ Env string `json:"runtime"`
+ Param map[string]string `json:"param"`
+ Sub *SubConfig `json:"sub"`
+}
+
+// ReadConfig conf
+func ReadConfig(file string) (SdkConfig, error) {
+ data, err := ioutil.ReadFile(file)
+ if err != nil {
+ return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
+ }
+
+ //璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮�
+ var v SdkConfig
+ err = json.Unmarshal(data, &v)
+
+ return v, err
+}
+
+// Atoi atoi
+func Atoi(s string) int {
+ i, _ := strconv.Atoi(s)
+ return i
+}
+
+// UnserilizeProto un
+func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case d := <-data:
+ if len(d) < 100 {
+ continue
+ }
+ msg := protomsg.SdkMessage{}
+ if err := proto.Unmarshal(d, &msg); err != nil {
+ fn(err, " msg 澶勭悊寮傚父")
+ continue
+ }
+
+ out <- msg
+
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+// Msg2MsgSDK msg->msgsdk
+func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
+
+ d, err := proto.Marshal(&msg)
+ if err != nil {
+ return nil
+ }
+
+ index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
+ if index >= count {
+ return &sdkstruct.MsgSDK{
+ MsgData: d,
+ SdkCount: count,
+ SdkIndex: index,
+ SdkDataLen: 0,
+ }
+ }
+
+ return &sdkstruct.MsgSDK{
+ MsgData: d,
+ SdkCount: count,
+ SdkIndex: index,
+ SdkDataLen: len(d),
+ }
+}
+
+// EjectResult eject
+func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
+
+ if res == nil {
+ s := Msg2MsgSDK(msg)
+ if s == nil {
+ return
+ }
+ out <- *s
+ return
+ }
+
+ msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
+
+ s := Msg2MsgSDK(msg)
+ if s == nil {
+ return
+ }
+ out <- *s
+}
+
+/////////////////////////////////////////////////////////////
+
+// ValidRemoteMessage valid or not
+func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
+ if msg.Tasklab == nil {
+ fn(fnName, " recieve msg nil")
+ return false
+ }
+
+ sdkLen := len(msg.Tasklab.Sdkinfos)
+ if sdkLen == 0 {
+ fn(fnName, " has no sdk info")
+ return false
+ }
+
+ curIndex := int(msg.Tasklab.Index)
+ if curIndex < 0 || curIndex >= sdkLen {
+ fn(fnName, " tasklab index ", curIndex, " error")
+ return false
+ }
+ if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
+ fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
+ return false
+ }
+ return true
+}
+
+// UnpackImage unpack
+func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
+ // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬
+ i := &protomsg.Image{}
+ err := proto.Unmarshal(msg.Data, i)
+ if err != nil {
+ fn(fnName, " protobuf decode CameraImage error: ", err.Error())
+ return nil
+ }
+ if i.Data == nil {
+ fn(fnName, " protomsg.Image data null")
+ return nil
+ }
+ return i
+}
diff --git a/common/recv.go b/common/recv.go
new file mode 100644
index 0000000..fb31433
--- /dev/null
+++ b/common/recv.go
@@ -0,0 +1,116 @@
+package common
+
+import (
+ "context"
+
+ "time"
+
+ "basic.com/valib/deliver.git"
+)
+
+// Reciever recv from ipc
+type Reciever struct {
+ ctx context.Context
+ ipcURL string
+ out chan<- []byte
+
+ shm bool
+ fnLogger func(...interface{})
+}
+
+// NewReciever new recv
+func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+ return &Reciever{
+ ipcURL: url,
+ out: out,
+ shm: shm,
+ fnLogger: fn,
+ }
+}
+
+// Run run a IPC client
+func (r *Reciever) Run(ctx context.Context) {
+
+ if r.shm {
+ r.runShm(ctx)
+ } else {
+ r.run(ctx, deliver.NewClient(mode, r.ipcURL))
+ }
+}
+
+func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
+
+ // t := time.Now()
+ // sc := 0
+
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ i.Close()
+ return
+ default:
+
+ if r.shm {
+ if d, err := i.Recv(); err != nil {
+ i.Close()
+ r.fnLogger("ANALYSIS RECV ERROR: ", err)
+
+ c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("ANALYSIS CREATE FAILED : ", err)
+ }
+ i = c
+ r.fnLogger("ANALYSIS CREATE SHM")
+ } else {
+ if d != nil {
+ count++
+ if count > 10 {
+ count = 0
+ r.fnLogger("~~~shm recv image:", len(d))
+ }
+ r.out <- d
+ }
+ }
+ } else {
+ if msg, err := i.Recv(); err != nil {
+ // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
+ } else {
+ count++
+ if count > 10 {
+ count = 0
+ r.fnLogger("~~~mangos recv image:", len(msg))
+ }
+ r.out <- msg
+ }
+ }
+
+ // sc++
+ // if sc == 25 {
+ // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
+ // sc = 0
+ // t = time.Now()
+ // }
+
+ }
+ }
+}
+
+func (r *Reciever) runShm(ctx context.Context) {
+ c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ r.run(ctx, c)
+}
diff --git a/common/send.go b/common/send.go
new file mode 100644
index 0000000..a0e7cc4
--- /dev/null
+++ b/common/send.go
@@ -0,0 +1,141 @@
+package common
+
+import (
+ "context"
+ "time"
+
+ "basic.com/libgowrapper/sdkstruct.git"
+ "basic.com/valib/deliver.git"
+)
+
+// Sender decoder ingo
+type Sender struct {
+ ipcURL string
+ chMsg <-chan sdkstruct.MsgSDK
+ shm bool
+ fn func([]byte, bool)
+
+ fnLogger func(...interface{})
+}
+
+// ApplyCallbackFunc cb
+func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
+
+ if s.fn == nil {
+ s.fn = f
+ }
+}
+
+// NewSender Sender
+func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
+ // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
+ return &Sender{
+ ipcURL: ipcURL,
+ chMsg: chMsg,
+ shm: shm,
+ fn: nil,
+ fnLogger: fn,
+ }
+}
+
+// Run run a IPC producer
+func (s *Sender) Run(ctx context.Context) {
+
+ if s.shm {
+ s.runShm(ctx)
+ } else {
+ i := deliver.NewClient(mode, s.ipcURL)
+ if i == nil {
+ s.fnLogger("sender 2 pubsub nng create error")
+ return
+ }
+ s.run(ctx, i)
+ }
+}
+
+func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
+
+ for {
+ select {
+ case <-ctx.Done():
+ s.fnLogger("stop Sender")
+ return
+ case i := <-s.chMsg:
+
+ data <- i.MsgData
+
+ if int(i.SdkIndex+1) == i.SdkCount {
+ if s.fn != nil {
+
+ sFlag := true
+ if i.SdkDataLen < 2 {
+ sFlag = false
+ }
+ s.fn(i.MsgData, sFlag)
+
+ }
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+
+ // go ruleserver.TimeTicker()
+
+ dataChan := make(chan []byte, 3)
+ go s.serializeProto(ctx, dataChan)
+
+ for {
+ select {
+ case <-ctx.Done():
+ i.Close()
+ return
+ case d := <-dataChan:
+
+ if s.shm {
+ if err := i.Send(d); err != nil {
+ i.Close()
+ s.fnLogger("ANALYSIS SENDER ERROR: ", err)
+
+ c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ i = c
+ } else {
+
+ }
+ } else {
+ err := i.Send(d)
+ if err != nil {
+ // logo.Errorln("error sender 2 pubsub: ", err)
+ } else {
+ s.fnLogger("mangos send to pubsub len: ", len(d))
+ }
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (s *Sender) runShm(ctx context.Context) {
+ c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ s.run(ctx, c)
+}
diff --git a/common/torule.go b/common/torule.go
new file mode 100644
index 0000000..c8be086
--- /dev/null
+++ b/common/torule.go
@@ -0,0 +1,128 @@
+package common
+
+import (
+ "container/list"
+ "context"
+ "sync"
+ "time"
+
+ "basic.com/valib/deliver.git"
+ // "basic.com/pubsub/protomsg.git"
+ // "github.com/gogo/protobuf/proto"
+)
+
+type runResult struct {
+ data []byte
+ valid bool
+}
+
+// ToRule ipc
+type ToRule struct {
+ ipcURL string
+ maxSize int
+ cache *list.List
+ cv *sync.Cond
+ cond bool
+ fnLogger func(...interface{})
+}
+
+// NewToRule send to ruleprocess
+func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
+ return &ToRule{
+ ipcURL: ipcURL,
+ maxSize: maxSize,
+ cache: list.New(),
+ cv: sync.NewCond(&sync.Mutex{}),
+ cond: false,
+ fnLogger: fn,
+ }
+}
+
+// Push data
+func (t *ToRule) Push(data []byte, valid bool) {
+
+ t.cv.L.Lock()
+ result := runResult{data, valid}
+ t.cache.PushBack(result)
+ if t.cache.Len() > t.maxSize {
+ for i := 0; i < t.cache.Len(); {
+ d := t.cache.Front().Value.(runResult)
+ if d.valid == false {
+ t.cache.Remove(t.cache.Front())
+ i = i + 2
+ } else {
+ i = i + 1
+ }
+ }
+ }
+ if t.cache.Len() > t.maxSize {
+ for i := 0; i < t.cache.Len(); {
+ t.cache.Remove(t.cache.Front())
+ i = i + 2
+ }
+ }
+ // logo.Infof("push to cache count : %d\n", t.cache.Len())
+ t.cond = true
+ t.cv.Signal()
+ t.cv.L.Unlock()
+}
+
+// Run forever
+func (t *ToRule) Run(ctx context.Context) {
+
+ var i deliver.Deliver
+ var err error
+
+ for {
+ i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
+ if err != nil {
+ time.Sleep(time.Second)
+ t.fnLogger("wait create to rule ipc", err)
+ continue
+ }
+ break
+ }
+
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+
+ var d []byte
+ t.cv.L.Lock()
+
+ for !t.cond {
+ t.cv.Wait()
+ }
+
+ for j := 0; j < 8; j++ {
+ if t.cache.Len() <= 0 {
+ break
+ }
+
+ d = t.cache.Front().Value.(runResult).data
+ if i != nil && d != nil {
+
+ err := i.Send(d)
+ if err != nil {
+ t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
+ } else {
+ count++
+ if count > 5 {
+ count = 0
+ t.fnLogger("~~~~~~SEND TO RULE CORRECT")
+ }
+ }
+ }
+ t.cache.Remove(t.cache.Front())
+ }
+
+ t.cond = false
+ t.cv.L.Unlock()
+
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index 94144bf..7c54cd5 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,9 @@
go 1.12
require (
- basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0
- basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3 // indirect
+ basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865
basic.com/valib/shm.git v0.0.0-20191029034255-156e610f9bca // indirect
github.com/gogo/protobuf v1.3.1
diff --git a/go.sum b/go.sum
index 1704e67..6646224 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,3 @@
-basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b h1:CWneZPtnrpSv1SDYtB/D+kna3njXNWLExUfQ4xM3Lxc=
-basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0 h1:5NOX81GTsRLwbhnHWYU4g6jfcOynSWetmf9PlhK5eLI=
diff --git a/run.go b/run.go
index 353c8ab..50cf58f 100644
--- a/run.go
+++ b/run.go
@@ -6,7 +6,7 @@
"sync"
"time"
- "basic.com/libgowrapper/sdkhelper.git"
+ "face/common"
"basic.com/libgowrapper/sdkstruct.git"
"basic.com/pubsub/protomsg.git"
@@ -76,7 +76,7 @@
fn("Face SDK Create Error When New SDK")
return nil
}
- cfg, err := sdkhelper.ReadConfig(config)
+ cfg, err := common.ReadConfig(config)
if err != nil {
fn("Face SDK Create Error When Read Config: ", err)
return nil
@@ -104,9 +104,9 @@
}
w, h, detThrd, detNum, detAngle, propThrd, extThrd, trckInterval, trckSmpl, maxChan, gpuM :=
- 1280, 720, sdkhelper.Atoi(cfg.Param[dt]), sdkhelper.Atoi(cfg.Param[dn]), sdkhelper.Atoi(cfg.Param[da]),
- sdkhelper.Atoi(cfg.Param[pt]), sdkhelper.Atoi(cfg.Param[et]), sdkhelper.Atoi(cfg.Param[ti]), sdkhelper.Atoi(cfg.Param[ts]),
- sdkhelper.Atoi(cfg.Param[mc]), sdkhelper.Atoi(cfg.Param[gm])
+ 1280, 720, common.Atoi(cfg.Param[dt]), common.Atoi(cfg.Param[dn]), common.Atoi(cfg.Param[da]),
+ common.Atoi(cfg.Param[pt]), common.Atoi(cfg.Param[et]), common.Atoi(cfg.Param[ti]), common.Atoi(cfg.Param[ts]),
+ common.Atoi(cfg.Param[mc]), common.Atoi(cfg.Param[gm])
if detAngle > 0 {
}
@@ -171,14 +171,14 @@
postPull = `_1`
postPush = `_2`
)
- ipcRcv := sdkhelper.GetIpcAddress(s.shm, s.id+postPull)
- ipcSnd := sdkhelper.GetIpcAddress(s.shm, s.id+postPush)
+ ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
+ ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
chRcv := make(chan []byte, s.maxChannel)
chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel)
- rcver := sdkhelper.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
- snder := sdkhelper.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
- torule := sdkhelper.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
+ rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
+ snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
+ torule := common.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
snder.ApplyCallbackFunc(torule.Push)
@@ -226,7 +226,7 @@
func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) {
chMsg := make(chan protomsg.SdkMessage, f.maxChannel)
- go sdkhelper.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
+ go common.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
for {
select {
@@ -234,9 +234,9 @@
f.handle.Free()
return
case rMsg := <-chMsg:
- if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
+ if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
- sdkhelper.EjectResult(nil, rMsg, out)
+ common.EjectResult(nil, rMsg, out)
continue
}
@@ -249,14 +249,14 @@
chn := f.getAvailableChn()
if chn < 0 {
f.fnLogger("TOO MUCH CHANNEL")
- sdkhelper.EjectResult(nil, rMsg, out)
+ common.EjectResult(nil, rMsg, out)
continue
}
f.ftrackChannels[rMsg.Cid] = chn
- i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
+ i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
if i == nil {
- sdkhelper.EjectResult(nil, rMsg, out)
+ common.EjectResult(nil, rMsg, out)
continue
}
// conv to bgr24 and resize
@@ -285,14 +285,14 @@
case rMsg := <-in:
- if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
- sdkhelper.EjectResult(nil, rMsg, out)
+ if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
+ common.EjectResult(nil, rMsg, out)
continue
}
- i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
+ i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
- sdkhelper.EjectResult(nil, rMsg, out)
+ common.EjectResult(nil, rMsg, out)
continue
}
@@ -305,7 +305,7 @@
count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
- sdkhelper.EjectResult(data, rMsg, out)
+ common.EjectResult(data, rMsg, out)
f.mtxRunning.Lock()
f.running = true
f.mtxRunning.Unlock()
--
Gitblit v1.8.0