From 61ca2454c9956ed08297af3afc73765e042eeafe Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 16 一月 2020 15:02:24 +0800
Subject: [PATCH] update

---
 run.go      |  189 +++++++++++++----------
 /dev/null   |   43 -----
 go.sum      |    2 
 rpc/recv.go |  108 +++++++++++++
 go.mod      |    1 
 rpc/send.go |   98 ++++++++++++
 6 files changed, 316 insertions(+), 125 deletions(-)

diff --git a/common/helper.go b/common/helper.go
deleted file mode 100644
index dc6f7b0..0000000
--- a/common/helper.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package common
-
-import (
-	"encoding/json"
-	"fmt"
-	"io/ioutil"
-	"strconv"
-)
-
-// SubConfig sub
-type SubConfig struct {
-	SoFile string            `json:"so_file_path"`
-	Env    string            `json:"runtime"`
-	Param  map[string]string `json:"param"`
-}
-
-// SdkConfig sdk
-type SdkConfig struct {
-	SoFile string            `json:"so_file_path"`
-	Env    string            `json:"runtime"`
-	Param  map[string]string `json:"param"`
-	Sub    *SubConfig        `json:"sub"`
-}
-
-// ReadConfig conf
-func ReadConfig(file string) (SdkConfig, error) {
-	data, err := ioutil.ReadFile(file)
-	if err != nil {
-		return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
-	}
-
-	//璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮�
-	var v SdkConfig
-	err = json.Unmarshal(data, &v)
-
-	return v, err
-}
-
-// Atoi atoi
-func Atoi(s string) int {
-	i, _ := strconv.Atoi(s)
-	return i
-}
diff --git a/go.mod b/go.mod
index ee5ce65..1a7ec0c 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@
 go 1.12
 
 require (
+	basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9
 	basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
 	basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573
 	basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
diff --git a/go.sum b/go.sum
index 99a54be..d9cc758 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,5 @@
+basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9 h1:Pl6mztTA7ac7G/LNmViCrvM5rXAHZ7gqmG5JQOwwTs4=
+basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
 basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
 basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
 basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA=
diff --git a/rpc/recv.go b/rpc/recv.go
new file mode 100644
index 0000000..2b0ca39
--- /dev/null
+++ b/rpc/recv.go
@@ -0,0 +1,108 @@
+package rpc
+
+import (
+	"context"
+
+	"time"
+
+	"basic.com/valib/deliver.git"
+)
+
+const mode = deliver.Shm
+
+// Reciever recv from ipc
+type Reciever struct {
+	ctx    context.Context
+	ipcURL string
+	out    chan<- []byte
+
+	shm      bool
+	fnLogger func(...interface{})
+}
+
+// NewReciever new recv
+func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+	return &Reciever{
+		ipcURL:   url,
+		out:      out,
+		shm:      shm,
+		fnLogger: fn,
+	}
+}
+
+// Run run a IPC client
+func (r *Reciever) Run(ctx context.Context) {
+
+	if r.shm {
+		r.runShm(ctx)
+	} else {
+		r.run(ctx, deliver.NewClient(mode, r.ipcURL))
+	}
+}
+
+func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
+
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			i.Close()
+			return
+		default:
+
+			if r.shm {
+				if d, err := i.Recv(); err != nil {
+					i.Close()
+					r.fnLogger("ANALYSIS RECV ERROR: ", err)
+
+					c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+					for {
+						if err == nil {
+							break
+						}
+						time.Sleep(time.Second)
+						c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+						r.fnLogger("ANALYSIS CREATE FAILED : ", err)
+					}
+					i = c
+					r.fnLogger("ANALYSIS CREATE SHM")
+				} else {
+					if d != nil {
+						count++
+						if count > 10 {
+							count = 0
+							r.fnLogger("~~~shm recv image:", len(d))
+						}
+						r.out <- d
+					}
+				}
+			} else {
+				if msg, err := i.Recv(); err != nil {
+					// logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
+				} else {
+					count++
+					if count > 10 {
+						count = 0
+						r.fnLogger("~~~mangos recv image:", len(msg))
+					}
+					r.out <- msg
+				}
+			}
+			time.Sleep(10 * time.Millisecond)
+		}
+	}
+}
+
+func (r *Reciever) runShm(ctx context.Context) {
+	c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+	for {
+		if err == nil {
+			break
+		}
+		time.Sleep(1 * time.Second)
+		c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+		r.fnLogger("CLIENT CREATE FAILED : ", err)
+	}
+	r.run(ctx, c)
+}
diff --git a/rpc/send.go b/rpc/send.go
new file mode 100644
index 0000000..70835d7
--- /dev/null
+++ b/rpc/send.go
@@ -0,0 +1,98 @@
+package rpc
+
+import (
+	"context"
+	"time"
+
+	"basic.com/valib/deliver.git"
+)
+
+// Sender decoder ingo
+type Sender struct {
+	ipcURL string
+	in     <-chan []byte
+	shm    bool
+	fn     func([]byte, bool)
+
+	fnLogger func(...interface{})
+}
+
+// NewSender Sender
+func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
+	return &Sender{
+		ipcURL:   ipcURL,
+		in:       in,
+		shm:      shm,
+		fn:       nil,
+		fnLogger: fn,
+	}
+}
+
+// Run run a IPC producer
+func (s *Sender) Run(ctx context.Context) {
+
+	if s.shm {
+		s.runShm(ctx)
+	} else {
+		i := deliver.NewClient(mode, s.ipcURL)
+		if i == nil {
+			s.fnLogger("sender 2 pubsub nng create error")
+			return
+		}
+		s.run(ctx, i)
+	}
+}
+
+func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+
+	for {
+		select {
+		case <-ctx.Done():
+			i.Close()
+			return
+		case d := <-s.in:
+
+			if s.shm {
+				if err := i.Send(d); err != nil {
+					i.Close()
+					s.fnLogger("ANALYSIS SENDER ERROR: ", err)
+
+					c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+					for {
+						if err == nil {
+							break
+						}
+						time.Sleep(time.Second)
+						c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+						s.fnLogger("CLIENT CREATE FAILED : ", err)
+					}
+					i = c
+				} else {
+
+				}
+			} else {
+				err := i.Send(d)
+				if err != nil {
+					// logo.Errorln("error sender 2 pubsub: ", err)
+				} else {
+					s.fnLogger("mangos send to pubsub len: ", len(d))
+				}
+			}
+		default:
+			time.Sleep(10 * time.Millisecond)
+		}
+	}
+}
+
+func (s *Sender) runShm(ctx context.Context) {
+	c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+	for {
+		if err == nil {
+			break
+		}
+		time.Sleep(1 * time.Second)
+		c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+		s.fnLogger("CLIENT CREATE FAILED : ", err)
+	}
+	s.run(ctx, c)
+}
diff --git a/run.go b/run.go
index 126d96c..8b00ebf 100644
--- a/run.go
+++ b/run.go
@@ -1,25 +1,60 @@
 package main
 
+/*
+#include <stdlib.h>
+#include <string.h>
+void* crop_image(void *vsrc, int srcW, int srcH, int x0, int y0, int x1, int y1, int channel, int *length)
+{
+	if (x0 < 0) x0 = 0;
+	if (x0 > srcW) x0 = srcW-1;
+	if (x1 < 0) x1 = 0;
+	if (x1 > srcW) x1 = srcW-1;
+
+	if (y0 < 0) y0 = 0;
+	if (y0 > srcH) y0 = srcH-1;
+	if (y1 < 0) y1 = 0;
+	if (y1 > srcH) y1 = srcH-1;
+
+	if (x1 - x0 <= 0 || y1 - y0 <= 0) return NULL;
+
+	if (x1-x0 > srcW) x1 = srcW-x0;
+	if (y1-y0 > srcH) y1 = srcH-y0;
+
+
+    unsigned char *src = (unsigned char*)vsrc;
+
+    int destW = x1 - x0 + 1;
+	int destH = y1 - y0 + 1;
+
+	*length = channel * destW * destH;
+	unsigned char * desData = (unsigned char*)malloc(*length);
+
+    int i = 0;
+    int destIdy = 0;
+
+    for (i = y0; i <= y1; i++)
+    {
+        destIdy = i - y0;
+        memcpy(&(desData[destIdy * destW * channel]), &(src[(i * srcW + x0) * channel]),sizeof(char) * channel * destW);
+    }
+
+	return desData;
+}*/
+import "C"
 import (
 	"context"
-	"io/ioutil"
-	"os"
-	"strings"
 	"time"
 	"unsafe"
 
-	"reid/common"
+	"reid/rpc"
+
+	"basic.com/libgowrapper/sdkhelper.git"
 
 	"basic.com/valib/gogpu.git"
 
 	"basic.com/pubsub/protomsg.git"
 
 	"github.com/gogo/protobuf/proto"
-
-	"nanomsg.org/go-mangos"
-	"nanomsg.org/go-mangos/protocol/rep"
-	"nanomsg.org/go-mangos/transport/ipc"
-	"nanomsg.org/go-mangos/transport/tcp"
 )
 
 type reid struct {
@@ -33,7 +68,7 @@
 // Create Reid
 func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
 
-	cfg, err := common.ReadConfig(config)
+	cfg, err := sdkhelper.ReadConfig(config)
 	if err != nil {
 		fn("Reid SDK Create Error When Read Config: ", err)
 		return nil
@@ -53,7 +88,7 @@
 		}
 	}
 
-	gpuM := common.Atoi(cfg.Param[sGPU])
+	gpuM := sdkhelper.Atoi(cfg.Param[sGPU])
 
 	rGPU := gpu
 
@@ -80,89 +115,79 @@
 func Run(ctx context.Context, i interface{}) {
 	s := i.(*reid)
 
-	var sock mangos.Socket
-	var err error
-	var msg []byte
+	const (
+		postPull = `_2`
+		postPush = `_1`
+	)
 
-	for {
-		if sock, err = rep.NewSocket(); err != nil {
-			s.fnLogger("can't get new rep socket: ", err)
-			time.Sleep(5 * time.Millisecond)
-		} else {
-			break
-		}
-	}
+	sndURL := s.ipc + postPush
+	rcvURL := s.ipc + postPull
 
-	sock.AddTransport(ipc.NewTransport())
-	sock.AddTransport(tcp.NewTransport())
+	chSnd := make(chan []byte, 3)
+	chRcv := make(chan []byte, 3)
 
-	for {
-		if err = sock.Listen(s.ipc); err != nil {
-			suf := "ipc://"
-			p := strings.Index(s.ipc, suf)
-			if p >= 0 {
-				file := s.ipc[p+len(string(suf)):]
-				os.Remove(file)
-				s.fnLogger("remove:", file)
-			}
-			s.fnLogger("can't listen on rep socket: ", err)
-			time.Sleep(5 * time.Millisecond)
-		} else {
-			break
-		}
-	}
+	recv := rpc.NewReciever(rcvURL, chRcv, true, s.fnLogger)
+	go recv.Run(ctx)
+
+	chMsg := make(chan protomsg.SdkMessage, 3)
+	go sdkhelper.UnserilizeProto(ctx, chRcv, chMsg, s.fnLogger)
+
+	send := rpc.NewSender(sndURL, chSnd, true, s.fnLogger)
+	go send.Run(ctx)
 
 	for {
 		select {
 		case <-ctx.Done():
-			sock.Close()
 			return
+		case msg := <-chMsg:
+			if len(msg.Tasklab.Sdkinfos) == 0 || int(msg.Tasklab.Index) >= len(msg.Tasklab.Sdkinfos) {
+				continue
+			}
+			i := sdkhelper.UnpackImage(msg, "reid", s.fnLogger)
+			if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
+				s.fnLogger("reid !!!!!! unpack image error")
+				continue
+			}
+
+			sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)]
+			res := &protomsg.HumanTrackResult{}
+			if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil {
+				s.fnLogger(err, " sdkinfo msg Unmarshal 澶勭悊寮傚父")
+				continue
+			}
+			for _, v := range res.Result {
+
+				var clen C.int
+				l, t, r, b := C.int(v.RcHuman.Left), C.int(v.RcHuman.Top), C.int(v.RcHuman.Right), C.int(v.RcHuman.Bottom)
+				cutImg := C.crop_image(unsafe.Pointer(&i.Data[0]), C.int(i.Width), C.int(i.Height), l, t, r, b, 3, &clen)
+				if cutImg != nil {
+					dl := int(clen)
+					data := (*[1 << 26]byte)((*[1 << 26]byte)(cutImg))[:dl:dl]
+
+					w, h := int(r-l+1), int(b-t+1)
+					v.Feature = s.handle.Extract2(unsafe.Pointer(&data[0]), w, h, 3)
+					C.free(cutImg)
+				}
+
+			}
+
+			if out, err := proto.Marshal(res); err == nil {
+				msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out
+				nMsg := protomsg.SdkMessage{}
+
+				if data, err := proto.Marshal(&nMsg); err == nil {
+					if data == nil {
+						s.fnLogger(err, " msg Marshal 澶勭悊寮傚父")
+						continue
+					}
+					chSnd <- data
+				}
+			}
 		default:
 
-			if msg, err = sock.Recv(); err != nil {
-				s.fnLogger("REID~~~~~~Recv From HumanTrack error: ", err)
-				continue
-			}
-
-			i := &protomsg.Image{}
-			err := proto.Unmarshal(msg, i)
-			if err != nil {
-				s.fnLogger("REID~~~~~~protobuf decode CameraImage error: ", err)
-				continue
-			}
-			if i.Data == nil {
-				s.fnLogger("REID~~~~~~protomsg.Image data null")
-				continue
-			}
-			s.fnLogger("REID~~~~~~Recv Image:", len(i.Data))
-
-			feat := s.handle.Extract2(unsafe.Pointer(&i.Data[0]), int(i.Width), int(i.Height), 3)
-			if feat == nil {
-				// feat = make([]float32, 1)
-			} else {
-				for k := 0; k < 3; k++ {
-					s.fnLogger("REID~~~~~~extractor---human_feats------%f", feat[k+2000])
-				}
-				s.fnLogger("REID~~~~~~Run Reid Use GPU: ", s.gpu)
-			}
-			buf := float32SliceAsByteSlice(feat)
-			ioutil.WriteFile("./reid-feat-byte.txt", buf, 0644)
-
-			if err = sock.Send(buf); err != nil {
-				s.fnLogger("REID~~~~~~Send HumanTrack error: ", err)
-			}
+			time.Sleep(10 * time.Millisecond)
 
 		}
 
 	}
-}
-
-func float32SliceAsByteSlice(src []float32) []byte {
-	if len(src) == 0 {
-		return nil
-	}
-
-	l := len(src) * 4
-	ptr := unsafe.Pointer(&src[0])
-	return (*[1 << 26]byte)((*[1 << 26]byte)(ptr))[:l:l]
 }

--
Gitblit v1.8.0