From 61ca2454c9956ed08297af3afc73765e042eeafe Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 16 一月 2020 15:02:24 +0800
Subject: [PATCH] update
---
run.go | 189 +++++++++++++----------
/dev/null | 43 -----
go.sum | 2
rpc/recv.go | 108 +++++++++++++
go.mod | 1
rpc/send.go | 98 ++++++++++++
6 files changed, 316 insertions(+), 125 deletions(-)
diff --git a/common/helper.go b/common/helper.go
deleted file mode 100644
index dc6f7b0..0000000
--- a/common/helper.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package common
-
-import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "strconv"
-)
-
-// SubConfig sub
-type SubConfig struct {
- SoFile string `json:"so_file_path"`
- Env string `json:"runtime"`
- Param map[string]string `json:"param"`
-}
-
-// SdkConfig sdk
-type SdkConfig struct {
- SoFile string `json:"so_file_path"`
- Env string `json:"runtime"`
- Param map[string]string `json:"param"`
- Sub *SubConfig `json:"sub"`
-}
-
-// ReadConfig conf
-func ReadConfig(file string) (SdkConfig, error) {
- data, err := ioutil.ReadFile(file)
- if err != nil {
- return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
- }
-
- //璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮�
- var v SdkConfig
- err = json.Unmarshal(data, &v)
-
- return v, err
-}
-
-// Atoi atoi
-func Atoi(s string) int {
- i, _ := strconv.Atoi(s)
- return i
-}
diff --git a/go.mod b/go.mod
index ee5ce65..1a7ec0c 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@
go 1.12
require (
+ basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573
basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
diff --git a/go.sum b/go.sum
index 99a54be..d9cc758 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,5 @@
+basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9 h1:Pl6mztTA7ac7G/LNmViCrvM5rXAHZ7gqmG5JQOwwTs4=
+basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA=
diff --git a/rpc/recv.go b/rpc/recv.go
new file mode 100644
index 0000000..2b0ca39
--- /dev/null
+++ b/rpc/recv.go
@@ -0,0 +1,108 @@
+package rpc
+
+import (
+ "context"
+
+ "time"
+
+ "basic.com/valib/deliver.git"
+)
+
+const mode = deliver.Shm
+
+// Reciever recv from ipc
+type Reciever struct {
+ ctx context.Context
+ ipcURL string
+ out chan<- []byte
+
+ shm bool
+ fnLogger func(...interface{})
+}
+
+// NewReciever new recv
+func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+ return &Reciever{
+ ipcURL: url,
+ out: out,
+ shm: shm,
+ fnLogger: fn,
+ }
+}
+
+// Run run a IPC client
+func (r *Reciever) Run(ctx context.Context) {
+
+ if r.shm {
+ r.runShm(ctx)
+ } else {
+ r.run(ctx, deliver.NewClient(mode, r.ipcURL))
+ }
+}
+
+func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
+
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ i.Close()
+ return
+ default:
+
+ if r.shm {
+ if d, err := i.Recv(); err != nil {
+ i.Close()
+ r.fnLogger("ANALYSIS RECV ERROR: ", err)
+
+ c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("ANALYSIS CREATE FAILED : ", err)
+ }
+ i = c
+ r.fnLogger("ANALYSIS CREATE SHM")
+ } else {
+ if d != nil {
+ count++
+ if count > 10 {
+ count = 0
+ r.fnLogger("~~~shm recv image:", len(d))
+ }
+ r.out <- d
+ }
+ }
+ } else {
+ if msg, err := i.Recv(); err != nil {
+ // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
+ } else {
+ count++
+ if count > 10 {
+ count = 0
+ r.fnLogger("~~~mangos recv image:", len(msg))
+ }
+ r.out <- msg
+ }
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (r *Reciever) runShm(ctx context.Context) {
+ c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ r.run(ctx, c)
+}
diff --git a/rpc/send.go b/rpc/send.go
new file mode 100644
index 0000000..70835d7
--- /dev/null
+++ b/rpc/send.go
@@ -0,0 +1,98 @@
+package rpc
+
+import (
+ "context"
+ "time"
+
+ "basic.com/valib/deliver.git"
+)
+
+// Sender decoder ingo
+type Sender struct {
+ ipcURL string
+ in <-chan []byte
+ shm bool
+ fn func([]byte, bool)
+
+ fnLogger func(...interface{})
+}
+
+// NewSender Sender
+func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
+ return &Sender{
+ ipcURL: ipcURL,
+ in: in,
+ shm: shm,
+ fn: nil,
+ fnLogger: fn,
+ }
+}
+
+// Run run a IPC producer
+func (s *Sender) Run(ctx context.Context) {
+
+ if s.shm {
+ s.runShm(ctx)
+ } else {
+ i := deliver.NewClient(mode, s.ipcURL)
+ if i == nil {
+ s.fnLogger("sender 2 pubsub nng create error")
+ return
+ }
+ s.run(ctx, i)
+ }
+}
+
+func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+
+ for {
+ select {
+ case <-ctx.Done():
+ i.Close()
+ return
+ case d := <-s.in:
+
+ if s.shm {
+ if err := i.Send(d); err != nil {
+ i.Close()
+ s.fnLogger("ANALYSIS SENDER ERROR: ", err)
+
+ c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ i = c
+ } else {
+
+ }
+ } else {
+ err := i.Send(d)
+ if err != nil {
+ // logo.Errorln("error sender 2 pubsub: ", err)
+ } else {
+ s.fnLogger("mangos send to pubsub len: ", len(d))
+ }
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (s *Sender) runShm(ctx context.Context) {
+ c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ s.run(ctx, c)
+}
diff --git a/run.go b/run.go
index 126d96c..8b00ebf 100644
--- a/run.go
+++ b/run.go
@@ -1,25 +1,60 @@
package main
+/*
+#include <stdlib.h>
+#include <string.h>
+void* crop_image(void *vsrc, int srcW, int srcH, int x0, int y0, int x1, int y1, int channel, int *length)
+{
+ if (x0 < 0) x0 = 0;
+ if (x0 > srcW) x0 = srcW-1;
+ if (x1 < 0) x1 = 0;
+ if (x1 > srcW) x1 = srcW-1;
+
+ if (y0 < 0) y0 = 0;
+ if (y0 > srcH) y0 = srcH-1;
+ if (y1 < 0) y1 = 0;
+ if (y1 > srcH) y1 = srcH-1;
+
+ if (x1 - x0 <= 0 || y1 - y0 <= 0) return NULL;
+
+ if (x1-x0 > srcW) x1 = srcW-x0;
+ if (y1-y0 > srcH) y1 = srcH-y0;
+
+
+ unsigned char *src = (unsigned char*)vsrc;
+
+ int destW = x1 - x0 + 1;
+ int destH = y1 - y0 + 1;
+
+ *length = channel * destW * destH;
+ unsigned char * desData = (unsigned char*)malloc(*length);
+
+ int i = 0;
+ int destIdy = 0;
+
+ for (i = y0; i <= y1; i++)
+ {
+ destIdy = i - y0;
+ memcpy(&(desData[destIdy * destW * channel]), &(src[(i * srcW + x0) * channel]),sizeof(char) * channel * destW);
+ }
+
+ return desData;
+}*/
+import "C"
import (
"context"
- "io/ioutil"
- "os"
- "strings"
"time"
"unsafe"
- "reid/common"
+ "reid/rpc"
+
+ "basic.com/libgowrapper/sdkhelper.git"
"basic.com/valib/gogpu.git"
"basic.com/pubsub/protomsg.git"
"github.com/gogo/protobuf/proto"
-
- "nanomsg.org/go-mangos"
- "nanomsg.org/go-mangos/protocol/rep"
- "nanomsg.org/go-mangos/transport/ipc"
- "nanomsg.org/go-mangos/transport/tcp"
)
type reid struct {
@@ -33,7 +68,7 @@
// Create Reid
func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
- cfg, err := common.ReadConfig(config)
+ cfg, err := sdkhelper.ReadConfig(config)
if err != nil {
fn("Reid SDK Create Error When Read Config: ", err)
return nil
@@ -53,7 +88,7 @@
}
}
- gpuM := common.Atoi(cfg.Param[sGPU])
+ gpuM := sdkhelper.Atoi(cfg.Param[sGPU])
rGPU := gpu
@@ -80,89 +115,79 @@
func Run(ctx context.Context, i interface{}) {
s := i.(*reid)
- var sock mangos.Socket
- var err error
- var msg []byte
+ const (
+ postPull = `_2`
+ postPush = `_1`
+ )
- for {
- if sock, err = rep.NewSocket(); err != nil {
- s.fnLogger("can't get new rep socket: ", err)
- time.Sleep(5 * time.Millisecond)
- } else {
- break
- }
- }
+ sndURL := s.ipc + postPush
+ rcvURL := s.ipc + postPull
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
+ chSnd := make(chan []byte, 3)
+ chRcv := make(chan []byte, 3)
- for {
- if err = sock.Listen(s.ipc); err != nil {
- suf := "ipc://"
- p := strings.Index(s.ipc, suf)
- if p >= 0 {
- file := s.ipc[p+len(string(suf)):]
- os.Remove(file)
- s.fnLogger("remove:", file)
- }
- s.fnLogger("can't listen on rep socket: ", err)
- time.Sleep(5 * time.Millisecond)
- } else {
- break
- }
- }
+ recv := rpc.NewReciever(rcvURL, chRcv, true, s.fnLogger)
+ go recv.Run(ctx)
+
+ chMsg := make(chan protomsg.SdkMessage, 3)
+ go sdkhelper.UnserilizeProto(ctx, chRcv, chMsg, s.fnLogger)
+
+ send := rpc.NewSender(sndURL, chSnd, true, s.fnLogger)
+ go send.Run(ctx)
for {
select {
case <-ctx.Done():
- sock.Close()
return
+ case msg := <-chMsg:
+ if len(msg.Tasklab.Sdkinfos) == 0 || int(msg.Tasklab.Index) >= len(msg.Tasklab.Sdkinfos) {
+ continue
+ }
+ i := sdkhelper.UnpackImage(msg, "reid", s.fnLogger)
+ if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
+ s.fnLogger("reid !!!!!! unpack image error")
+ continue
+ }
+
+ sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)]
+ res := &protomsg.HumanTrackResult{}
+ if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil {
+ s.fnLogger(err, " sdkinfo msg Unmarshal 澶勭悊寮傚父")
+ continue
+ }
+ for _, v := range res.Result {
+
+ var clen C.int
+ l, t, r, b := C.int(v.RcHuman.Left), C.int(v.RcHuman.Top), C.int(v.RcHuman.Right), C.int(v.RcHuman.Bottom)
+ cutImg := C.crop_image(unsafe.Pointer(&i.Data[0]), C.int(i.Width), C.int(i.Height), l, t, r, b, 3, &clen)
+ if cutImg != nil {
+ dl := int(clen)
+ data := (*[1 << 26]byte)((*[1 << 26]byte)(cutImg))[:dl:dl]
+
+ w, h := int(r-l+1), int(b-t+1)
+ v.Feature = s.handle.Extract2(unsafe.Pointer(&data[0]), w, h, 3)
+ C.free(cutImg)
+ }
+
+ }
+
+ if out, err := proto.Marshal(res); err == nil {
+ msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out
+ nMsg := protomsg.SdkMessage{}
+
+ if data, err := proto.Marshal(&nMsg); err == nil {
+ if data == nil {
+ s.fnLogger(err, " msg Marshal 澶勭悊寮傚父")
+ continue
+ }
+ chSnd <- data
+ }
+ }
default:
- if msg, err = sock.Recv(); err != nil {
- s.fnLogger("REID~~~~~~Recv From HumanTrack error: ", err)
- continue
- }
-
- i := &protomsg.Image{}
- err := proto.Unmarshal(msg, i)
- if err != nil {
- s.fnLogger("REID~~~~~~protobuf decode CameraImage error: ", err)
- continue
- }
- if i.Data == nil {
- s.fnLogger("REID~~~~~~protomsg.Image data null")
- continue
- }
- s.fnLogger("REID~~~~~~Recv Image:", len(i.Data))
-
- feat := s.handle.Extract2(unsafe.Pointer(&i.Data[0]), int(i.Width), int(i.Height), 3)
- if feat == nil {
- // feat = make([]float32, 1)
- } else {
- for k := 0; k < 3; k++ {
- s.fnLogger("REID~~~~~~extractor---human_feats------%f", feat[k+2000])
- }
- s.fnLogger("REID~~~~~~Run Reid Use GPU: ", s.gpu)
- }
- buf := float32SliceAsByteSlice(feat)
- ioutil.WriteFile("./reid-feat-byte.txt", buf, 0644)
-
- if err = sock.Send(buf); err != nil {
- s.fnLogger("REID~~~~~~Send HumanTrack error: ", err)
- }
+ time.Sleep(10 * time.Millisecond)
}
}
-}
-
-func float32SliceAsByteSlice(src []float32) []byte {
- if len(src) == 0 {
- return nil
- }
-
- l := len(src) * 4
- ptr := unsafe.Pointer(&src[0])
- return (*[1 << 26]byte)((*[1 << 26]byte)(ptr))[:l:l]
}
--
Gitblit v1.8.0