reid from https://github.com/michuanhaohao/reid-strong-baseline
zhangmeng
2020-01-16 61ca2454c9956ed08297af3afc73765e042eeafe
update
1个文件已删除
2个文件已添加
3个文件已修改
441 ■■■■ 已修改文件
common/helper.go 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rpc/recv.go 108 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rpc/send.go 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
run.go 189 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/helper.go
File was deleted
go.mod
@@ -3,6 +3,7 @@
go 1.12
require (
    basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
    basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573
    basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
go.sum
@@ -1,3 +1,5 @@
basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9 h1:Pl6mztTA7ac7G/LNmViCrvM5rXAHZ7gqmG5JQOwwTs4=
basic.com/libgowrapper/sdkhelper.git v0.0.0-20200116020405-fac8baf7b0e9/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA=
rpc/recv.go
New file
@@ -0,0 +1,108 @@
package rpc
import (
    "context"
    "time"
    "basic.com/valib/deliver.git"
)
const mode = deliver.Shm
// Reciever recv from ipc
type Reciever struct {
    ctx    context.Context
    ipcURL string
    out    chan<- []byte
    shm      bool
    fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
    return &Reciever{
        ipcURL:   url,
        out:      out,
        shm:      shm,
        fnLogger: fn,
    }
}
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
    if r.shm {
        r.runShm(ctx)
    } else {
        r.run(ctx, deliver.NewClient(mode, r.ipcURL))
    }
}
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
    count := 0
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        default:
            if r.shm {
                if d, err := i.Recv(); err != nil {
                    i.Close()
                    r.fnLogger("ANALYSIS RECV ERROR: ", err)
                    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                        r.fnLogger("ANALYSIS CREATE FAILED : ", err)
                    }
                    i = c
                    r.fnLogger("ANALYSIS CREATE SHM")
                } else {
                    if d != nil {
                        count++
                        if count > 10 {
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        }
                        r.out <- d
                    }
                }
            } else {
                if msg, err := i.Recv(); err != nil {
                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
                } else {
                    count++
                    if count > 10 {
                        count = 0
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    r.out <- msg
                }
            }
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (r *Reciever) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
        r.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    r.run(ctx, c)
}
rpc/send.go
New file
@@ -0,0 +1,98 @@
package rpc
import (
    "context"
    "time"
    "basic.com/valib/deliver.git"
)
// Sender decoder ingo
type Sender struct {
    ipcURL string
    in     <-chan []byte
    shm    bool
    fn     func([]byte, bool)
    fnLogger func(...interface{})
}
// NewSender Sender
func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
    return &Sender{
        ipcURL:   ipcURL,
        in:       in,
        shm:      shm,
        fn:       nil,
        fnLogger: fn,
    }
}
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
    if s.shm {
        s.runShm(ctx)
    } else {
        i := deliver.NewClient(mode, s.ipcURL)
        if i == nil {
            s.fnLogger("sender 2 pubsub nng create error")
            return
        }
        s.run(ctx, i)
    }
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        case d := <-s.in:
            if s.shm {
                if err := i.Send(d); err != nil {
                    i.Close()
                    s.fnLogger("ANALYSIS SENDER ERROR: ", err)
                    c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
                        s.fnLogger("CLIENT CREATE FAILED : ", err)
                    }
                    i = c
                } else {
                }
            } else {
                err := i.Send(d)
                if err != nil {
                    // logo.Errorln("error sender 2 pubsub: ", err)
                } else {
                    s.fnLogger("mangos send to pubsub len: ", len(d))
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (s *Sender) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
        s.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    s.run(ctx, c)
}
run.go
@@ -1,25 +1,60 @@
package main
/*
#include <stdlib.h>
#include <string.h>
void* crop_image(void *vsrc, int srcW, int srcH, int x0, int y0, int x1, int y1, int channel, int *length)
{
    if (x0 < 0) x0 = 0;
    if (x0 > srcW) x0 = srcW-1;
    if (x1 < 0) x1 = 0;
    if (x1 > srcW) x1 = srcW-1;
    if (y0 < 0) y0 = 0;
    if (y0 > srcH) y0 = srcH-1;
    if (y1 < 0) y1 = 0;
    if (y1 > srcH) y1 = srcH-1;
    if (x1 - x0 <= 0 || y1 - y0 <= 0) return NULL;
    if (x1-x0 > srcW) x1 = srcW-x0;
    if (y1-y0 > srcH) y1 = srcH-y0;
    unsigned char *src = (unsigned char*)vsrc;
    int destW = x1 - x0 + 1;
    int destH = y1 - y0 + 1;
    *length = channel * destW * destH;
    unsigned char * desData = (unsigned char*)malloc(*length);
    int i = 0;
    int destIdy = 0;
    for (i = y0; i <= y1; i++)
    {
        destIdy = i - y0;
        memcpy(&(desData[destIdy * destW * channel]), &(src[(i * srcW + x0) * channel]),sizeof(char) * channel * destW);
    }
    return desData;
}*/
import "C"
import (
    "context"
    "io/ioutil"
    "os"
    "strings"
    "time"
    "unsafe"
    "reid/common"
    "reid/rpc"
    "basic.com/libgowrapper/sdkhelper.git"
    "basic.com/valib/gogpu.git"
    "basic.com/pubsub/protomsg.git"
    "github.com/gogo/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
type reid struct {
@@ -33,7 +68,7 @@
// Create Reid
func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
    cfg, err := common.ReadConfig(config)
    cfg, err := sdkhelper.ReadConfig(config)
    if err != nil {
        fn("Reid SDK Create Error When Read Config: ", err)
        return nil
@@ -53,7 +88,7 @@
        }
    }
    gpuM := common.Atoi(cfg.Param[sGPU])
    gpuM := sdkhelper.Atoi(cfg.Param[sGPU])
    rGPU := gpu
@@ -80,89 +115,79 @@
func Run(ctx context.Context, i interface{}) {
    s := i.(*reid)
    var sock mangos.Socket
    var err error
    var msg []byte
    const (
        postPull = `_2`
        postPush = `_1`
    )
    for {
        if sock, err = rep.NewSocket(); err != nil {
            s.fnLogger("can't get new rep socket: ", err)
            time.Sleep(5 * time.Millisecond)
        } else {
            break
        }
    }
    sndURL := s.ipc + postPush
    rcvURL := s.ipc + postPull
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    chSnd := make(chan []byte, 3)
    chRcv := make(chan []byte, 3)
    for {
        if err = sock.Listen(s.ipc); err != nil {
            suf := "ipc://"
            p := strings.Index(s.ipc, suf)
            if p >= 0 {
                file := s.ipc[p+len(string(suf)):]
                os.Remove(file)
                s.fnLogger("remove:", file)
            }
            s.fnLogger("can't listen on rep socket: ", err)
            time.Sleep(5 * time.Millisecond)
        } else {
            break
        }
    }
    recv := rpc.NewReciever(rcvURL, chRcv, true, s.fnLogger)
    go recv.Run(ctx)
    chMsg := make(chan protomsg.SdkMessage, 3)
    go sdkhelper.UnserilizeProto(ctx, chRcv, chMsg, s.fnLogger)
    send := rpc.NewSender(sndURL, chSnd, true, s.fnLogger)
    go send.Run(ctx)
    for {
        select {
        case <-ctx.Done():
            sock.Close()
            return
        case msg := <-chMsg:
            if len(msg.Tasklab.Sdkinfos) == 0 || int(msg.Tasklab.Index) >= len(msg.Tasklab.Sdkinfos) {
                continue
            }
            i := sdkhelper.UnpackImage(msg, "reid", s.fnLogger)
            if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
                s.fnLogger("reid !!!!!! unpack image error")
                continue
            }
            sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)]
            res := &protomsg.HumanTrackResult{}
            if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil {
                s.fnLogger(err, " sdkinfo msg Unmarshal 处理异常")
                continue
            }
            for _, v := range res.Result {
                var clen C.int
                l, t, r, b := C.int(v.RcHuman.Left), C.int(v.RcHuman.Top), C.int(v.RcHuman.Right), C.int(v.RcHuman.Bottom)
                cutImg := C.crop_image(unsafe.Pointer(&i.Data[0]), C.int(i.Width), C.int(i.Height), l, t, r, b, 3, &clen)
                if cutImg != nil {
                    dl := int(clen)
                    data := (*[1 << 26]byte)((*[1 << 26]byte)(cutImg))[:dl:dl]
                    w, h := int(r-l+1), int(b-t+1)
                    v.Feature = s.handle.Extract2(unsafe.Pointer(&data[0]), w, h, 3)
                    C.free(cutImg)
                }
            }
            if out, err := proto.Marshal(res); err == nil {
                msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out
                nMsg := protomsg.SdkMessage{}
                if data, err := proto.Marshal(&nMsg); err == nil {
                    if data == nil {
                        s.fnLogger(err, " msg Marshal 处理异常")
                        continue
                    }
                    chSnd <- data
                }
            }
        default:
            if msg, err = sock.Recv(); err != nil {
                s.fnLogger("REID~~~~~~Recv From HumanTrack error: ", err)
                continue
            }
            i := &protomsg.Image{}
            err := proto.Unmarshal(msg, i)
            if err != nil {
                s.fnLogger("REID~~~~~~protobuf decode CameraImage error: ", err)
                continue
            }
            if i.Data == nil {
                s.fnLogger("REID~~~~~~protomsg.Image data null")
                continue
            }
            s.fnLogger("REID~~~~~~Recv Image:", len(i.Data))
            feat := s.handle.Extract2(unsafe.Pointer(&i.Data[0]), int(i.Width), int(i.Height), 3)
            if feat == nil {
                // feat = make([]float32, 1)
            } else {
                for k := 0; k < 3; k++ {
                    s.fnLogger("REID~~~~~~extractor---human_feats------%f", feat[k+2000])
                }
                s.fnLogger("REID~~~~~~Run Reid Use GPU: ", s.gpu)
            }
            buf := float32SliceAsByteSlice(feat)
            ioutil.WriteFile("./reid-feat-byte.txt", buf, 0644)
            if err = sock.Send(buf); err != nil {
                s.fnLogger("REID~~~~~~Send HumanTrack error: ", err)
            }
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func float32SliceAsByteSlice(src []float32) []byte {
    if len(src) == 0 {
        return nil
    }
    l := len(src) * 4
    ptr := unsafe.Pointer(&src[0])
    return (*[1 << 26]byte)((*[1 << 26]byte)(ptr))[:l:l]
}