From 61ca2454c9956ed08297af3afc73765e042eeafe Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 16 一月 2020 15:02:24 +0800 Subject: [PATCH] update --- run.go | 189 +++++++++++++---------- /dev/null | 43 ----- go.sum | 2 rpc/recv.go | 108 +++++++++++++ go.mod | 1 rpc/send.go | 98 ++++++++++++ 6 files changed, 316 insertions(+), 125 deletions(-) diff --git a/common/helper.go b/common/helper.go deleted file mode 100644 index dc6f7b0..0000000 --- a/common/helper.go +++ /dev/null @@ -1,43 +0,0 @@ -package common - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "strconv" -) - -// SubConfig sub -type SubConfig struct { - SoFile string `json:"so_file_path"` - Env string `json:"runtime"` - Param map[string]string `json:"param"` -} - -// SdkConfig sdk -type SdkConfig struct { - SoFile string `json:"so_file_path"` - Env string `json:"runtime"` - Param map[string]string `json:"param"` - Sub *SubConfig `json:"sub"` -} - -// ReadConfig conf -func ReadConfig(file string) (SdkConfig, error) { - data, err := ioutil.ReadFile(file) - if err != nil { - return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file) - } - - //璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮� - var v SdkConfig - err = json.Unmarshal(data, &v) - - return v, err -} - -// Atoi atoi -func Atoi(s string) int { - i, _ := strconv.Atoi(s) - return i -} diff --git a/go.mod b/go.mod index ee5ce65..1a7ec0c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ go 1.12 require ( + basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9 basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3 diff --git a/go.sum b/go.sum index 99a54be..d9cc758 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9 h1:Pl6mztTA7ac7G/LNmViCrvM5rXAHZ7gqmG5JQOwwTs4= +basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4= basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I= basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag= basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA= diff --git a/rpc/recv.go b/rpc/recv.go new file mode 100644 index 0000000..2b0ca39 --- /dev/null +++ b/rpc/recv.go @@ -0,0 +1,108 @@ +package rpc + +import ( + "context" + + "time" + + "basic.com/valib/deliver.git" +) + +const mode = deliver.Shm + +// Reciever recv from ipc +type Reciever struct { + ctx context.Context + ipcURL string + out chan<- []byte + + shm bool + fnLogger func(...interface{}) +} + +// NewReciever new recv +func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { + return &Reciever{ + ipcURL: url, + out: out, + shm: shm, + fnLogger: fn, + } +} + +// Run run a IPC client +func (r *Reciever) Run(ctx context.Context) { + + if r.shm { + r.runShm(ctx) + } else { + r.run(ctx, deliver.NewClient(mode, r.ipcURL)) + } +} + +func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { + + count := 0 + + for { + select { + case <-ctx.Done(): + i.Close() + return + default: + + if r.shm { + if d, err := i.Recv(); err != nil { + i.Close() + r.fnLogger("ANALYSIS RECV ERROR: ", err) + + c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) + for { + if err == nil { + break + } + time.Sleep(time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) + r.fnLogger("ANALYSIS CREATE FAILED : ", err) + } + i = c + r.fnLogger("ANALYSIS CREATE SHM") + } else { + if d != nil { + count++ + if count > 10 { + count = 0 + r.fnLogger("~~~shm recv image:", len(d)) + } + r.out <- d + } + } + } else { + if msg, err := i.Recv(); err != nil { + // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) + } else { + count++ + if count > 10 { + count = 0 + r.fnLogger("~~~mangos recv image:", len(msg)) + } + r.out <- msg + } + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func (r *Reciever) runShm(ctx context.Context) { + c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) + r.fnLogger("CLIENT CREATE FAILED : ", err) + } + r.run(ctx, c) +} diff --git a/rpc/send.go b/rpc/send.go new file mode 100644 index 0000000..70835d7 --- /dev/null +++ b/rpc/send.go @@ -0,0 +1,98 @@ +package rpc + +import ( + "context" + "time" + + "basic.com/valib/deliver.git" +) + +// Sender decoder ingo +type Sender struct { + ipcURL string + in <-chan []byte + shm bool + fn func([]byte, bool) + + fnLogger func(...interface{}) +} + +// NewSender Sender +func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { + return &Sender{ + ipcURL: ipcURL, + in: in, + shm: shm, + fn: nil, + fnLogger: fn, + } +} + +// Run run a IPC producer +func (s *Sender) Run(ctx context.Context) { + + if s.shm { + s.runShm(ctx) + } else { + i := deliver.NewClient(mode, s.ipcURL) + if i == nil { + s.fnLogger("sender 2 pubsub nng create error") + return + } + s.run(ctx, i) + } +} + +func (s *Sender) run(ctx context.Context, i deliver.Deliver) { + + for { + select { + case <-ctx.Done(): + i.Close() + return + case d := <-s.in: + + if s.shm { + if err := i.Send(d); err != nil { + i.Close() + s.fnLogger("ANALYSIS SENDER ERROR: ", err) + + c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) + for { + if err == nil { + break + } + time.Sleep(time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) + s.fnLogger("CLIENT CREATE FAILED : ", err) + } + i = c + } else { + + } + } else { + err := i.Send(d) + if err != nil { + // logo.Errorln("error sender 2 pubsub: ", err) + } else { + s.fnLogger("mangos send to pubsub len: ", len(d)) + } + } + default: + time.Sleep(10 * time.Millisecond) + } + } +} + +func (s *Sender) runShm(ctx context.Context) { + c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) + s.fnLogger("CLIENT CREATE FAILED : ", err) + } + s.run(ctx, c) +} diff --git a/run.go b/run.go index 126d96c..8b00ebf 100644 --- a/run.go +++ b/run.go @@ -1,25 +1,60 @@ package main +/* +#include <stdlib.h> +#include <string.h> +void* crop_image(void *vsrc, int srcW, int srcH, int x0, int y0, int x1, int y1, int channel, int *length) +{ + if (x0 < 0) x0 = 0; + if (x0 > srcW) x0 = srcW-1; + if (x1 < 0) x1 = 0; + if (x1 > srcW) x1 = srcW-1; + + if (y0 < 0) y0 = 0; + if (y0 > srcH) y0 = srcH-1; + if (y1 < 0) y1 = 0; + if (y1 > srcH) y1 = srcH-1; + + if (x1 - x0 <= 0 || y1 - y0 <= 0) return NULL; + + if (x1-x0 > srcW) x1 = srcW-x0; + if (y1-y0 > srcH) y1 = srcH-y0; + + + unsigned char *src = (unsigned char*)vsrc; + + int destW = x1 - x0 + 1; + int destH = y1 - y0 + 1; + + *length = channel * destW * destH; + unsigned char * desData = (unsigned char*)malloc(*length); + + int i = 0; + int destIdy = 0; + + for (i = y0; i <= y1; i++) + { + destIdy = i - y0; + memcpy(&(desData[destIdy * destW * channel]), &(src[(i * srcW + x0) * channel]),sizeof(char) * channel * destW); + } + + return desData; +}*/ +import "C" import ( "context" - "io/ioutil" - "os" - "strings" "time" "unsafe" - "reid/common" + "reid/rpc" + + "basic.com/libgowrapper/sdkhelper.git" "basic.com/valib/gogpu.git" "basic.com/pubsub/protomsg.git" "github.com/gogo/protobuf/proto" - - "nanomsg.org/go-mangos" - "nanomsg.org/go-mangos/protocol/rep" - "nanomsg.org/go-mangos/transport/ipc" - "nanomsg.org/go-mangos/transport/tcp" ) type reid struct { @@ -33,7 +68,7 @@ // Create Reid func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} { - cfg, err := common.ReadConfig(config) + cfg, err := sdkhelper.ReadConfig(config) if err != nil { fn("Reid SDK Create Error When Read Config: ", err) return nil @@ -53,7 +88,7 @@ } } - gpuM := common.Atoi(cfg.Param[sGPU]) + gpuM := sdkhelper.Atoi(cfg.Param[sGPU]) rGPU := gpu @@ -80,89 +115,79 @@ func Run(ctx context.Context, i interface{}) { s := i.(*reid) - var sock mangos.Socket - var err error - var msg []byte + const ( + postPull = `_2` + postPush = `_1` + ) - for { - if sock, err = rep.NewSocket(); err != nil { - s.fnLogger("can't get new rep socket: ", err) - time.Sleep(5 * time.Millisecond) - } else { - break - } - } + sndURL := s.ipc + postPush + rcvURL := s.ipc + postPull - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) + chSnd := make(chan []byte, 3) + chRcv := make(chan []byte, 3) - for { - if err = sock.Listen(s.ipc); err != nil { - suf := "ipc://" - p := strings.Index(s.ipc, suf) - if p >= 0 { - file := s.ipc[p+len(string(suf)):] - os.Remove(file) - s.fnLogger("remove:", file) - } - s.fnLogger("can't listen on rep socket: ", err) - time.Sleep(5 * time.Millisecond) - } else { - break - } - } + recv := rpc.NewReciever(rcvURL, chRcv, true, s.fnLogger) + go recv.Run(ctx) + + chMsg := make(chan protomsg.SdkMessage, 3) + go sdkhelper.UnserilizeProto(ctx, chRcv, chMsg, s.fnLogger) + + send := rpc.NewSender(sndURL, chSnd, true, s.fnLogger) + go send.Run(ctx) for { select { case <-ctx.Done(): - sock.Close() return + case msg := <-chMsg: + if len(msg.Tasklab.Sdkinfos) == 0 || int(msg.Tasklab.Index) >= len(msg.Tasklab.Sdkinfos) { + continue + } + i := sdkhelper.UnpackImage(msg, "reid", s.fnLogger) + if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 { + s.fnLogger("reid !!!!!! unpack image error") + continue + } + + sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)] + res := &protomsg.HumanTrackResult{} + if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil { + s.fnLogger(err, " sdkinfo msg Unmarshal 澶勭悊寮傚父") + continue + } + for _, v := range res.Result { + + var clen C.int + l, t, r, b := C.int(v.RcHuman.Left), C.int(v.RcHuman.Top), C.int(v.RcHuman.Right), C.int(v.RcHuman.Bottom) + cutImg := C.crop_image(unsafe.Pointer(&i.Data[0]), C.int(i.Width), C.int(i.Height), l, t, r, b, 3, &clen) + if cutImg != nil { + dl := int(clen) + data := (*[1 << 26]byte)((*[1 << 26]byte)(cutImg))[:dl:dl] + + w, h := int(r-l+1), int(b-t+1) + v.Feature = s.handle.Extract2(unsafe.Pointer(&data[0]), w, h, 3) + C.free(cutImg) + } + + } + + if out, err := proto.Marshal(res); err == nil { + msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out + nMsg := protomsg.SdkMessage{} + + if data, err := proto.Marshal(&nMsg); err == nil { + if data == nil { + s.fnLogger(err, " msg Marshal 澶勭悊寮傚父") + continue + } + chSnd <- data + } + } default: - if msg, err = sock.Recv(); err != nil { - s.fnLogger("REID~~~~~~Recv From HumanTrack error: ", err) - continue - } - - i := &protomsg.Image{} - err := proto.Unmarshal(msg, i) - if err != nil { - s.fnLogger("REID~~~~~~protobuf decode CameraImage error: ", err) - continue - } - if i.Data == nil { - s.fnLogger("REID~~~~~~protomsg.Image data null") - continue - } - s.fnLogger("REID~~~~~~Recv Image:", len(i.Data)) - - feat := s.handle.Extract2(unsafe.Pointer(&i.Data[0]), int(i.Width), int(i.Height), 3) - if feat == nil { - // feat = make([]float32, 1) - } else { - for k := 0; k < 3; k++ { - s.fnLogger("REID~~~~~~extractor---human_feats------%f", feat[k+2000]) - } - s.fnLogger("REID~~~~~~Run Reid Use GPU: ", s.gpu) - } - buf := float32SliceAsByteSlice(feat) - ioutil.WriteFile("./reid-feat-byte.txt", buf, 0644) - - if err = sock.Send(buf); err != nil { - s.fnLogger("REID~~~~~~Send HumanTrack error: ", err) - } + time.Sleep(10 * time.Millisecond) } } -} - -func float32SliceAsByteSlice(src []float32) []byte { - if len(src) == 0 { - return nil - } - - l := len(src) * 4 - ptr := unsafe.Pointer(&src[0]) - return (*[1 << 26]byte)((*[1 << 26]byte)(ptr))[:l:l] } -- Gitblit v1.8.0