reid from https://github.com/michuanhaohao/reid-strong-baseline
zhangmeng
2020-01-14 5459ba1d3f7f944aa97923ed9c09a5dbc7663928
update
6个文件已添加
3个文件已修改
784 ■■■■■ 已修改文件
common/flow.go 124 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/helper.go 174 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/lockList.go 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/recv.go 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/send.go 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/torule.go 128 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
run.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/flow.go
New file
@@ -0,0 +1,124 @@
package common
import (
    "context"
    "time"
    "basic.com/pubsub/protomsg.git"
    "basic.com/libgowrapper/sdkstruct.git"
)
/////////////////////////////////////////////////////////////////
// FlowCreate create flow
func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) {
    const (
        postPull = `_1`
        postPush = `_2`
    )
    ipcRcv := GetIpcAddress(shm, id+postPull)
    ipcSnd := GetIpcAddress(shm, id+postPush)
    chRcv := make(chan []byte, 3)
    chSnd := make(chan sdkstruct.MsgSDK, 3)
    rcver := NewReciever(ipcRcv, chRcv, shm, fn)
    snder := NewSender(ipcSnd, chSnd, shm, fn)
    torule := NewToRule(ipc2Rule, ruleCacheSize, fn)
    snder.ApplyCallbackFunc(torule.Push)
    go rcver.Run(ctx)
    go snder.Run(ctx)
    go torule.Run(ctx)
    return chRcv, chSnd
}
// WorkFlowSimple work
func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string,
    fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
    fn func(...interface{})) {
    tm := time.Now()
    sc := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            elems := fnConsume()
            if elems == nil || len(elems) == 0 {
                time.Sleep(10 * time.Millisecond)
                continue
            }
            var msgs []protomsg.SdkMessage
            for _, v := range elems {
                msgs = append(msgs, v.(protomsg.SdkMessage))
            }
            fnRun(msgs, out, typ)
            sc++
            if sc == 25 {
                fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
                sc = 0
                tm = time.Now()
            }
            if time.Since(tm) > time.Second {
                fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
                sc = 0
                tm = time.Now()
            }
        }
    }
}
// FlowSimple wrap
func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
    fnProduce func(interface{}), fnConsume func() []interface{},
    fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
    fnClose func(), fn func(...interface{})) {
    cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) {
        fnRun(msgs[0], ch, typ)
    }
    FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn)
}
// FlowBatch batch
func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
    fnProduce func(interface{}), fnConsume func() []interface{},
    fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
    fnClose func(), fn func(...interface{})) {
    chMsg := make(chan protomsg.SdkMessage, 3)
    go UnserilizeProto(ctx, in, chMsg, fn)
    go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn)
    for {
        select {
        case <-ctx.Done():
            fnClose()
            return
        case rMsg := <-chMsg:
            if !ValidRemoteMessage(rMsg, typ, fn) {
                fn(typ, " validremotemessage invalid")
                EjectResult(nil, rMsg, out)
                continue
            }
            fnProduce(rMsg)
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
common/helper.go
New file
@@ -0,0 +1,174 @@
package common
import (
    "context"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "strconv"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "github.com/gogo/protobuf/proto"
)
const mode = deliver.PushPull
// GetIpcAddress get ipc
func GetIpcAddress(shm bool, id string) string {
    if shm {
        return id
    }
    return `ipc:///tmp/` + id + `.ipc`
}
// SubConfig sub
type SubConfig struct {
    SoFile string            `json:"so_file_path"`
    Env    string            `json:"runtime"`
    Param  map[string]string `json:"param"`
}
// SdkConfig sdk
type SdkConfig struct {
    SoFile string            `json:"so_file_path"`
    Env    string            `json:"runtime"`
    Param  map[string]string `json:"param"`
    Sub    *SubConfig        `json:"sub"`
}
// ReadConfig conf
func ReadConfig(file string) (SdkConfig, error) {
    data, err := ioutil.ReadFile(file)
    if err != nil {
        return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
    }
    //读取的数据为json格式,需要进行解码
    var v SdkConfig
    err = json.Unmarshal(data, &v)
    return v, err
}
// Atoi atoi
func Atoi(s string) int {
    i, _ := strconv.Atoi(s)
    return i
}
// UnserilizeProto un
func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            return
        case d := <-data:
            if len(d) < 100 {
                continue
            }
            msg := protomsg.SdkMessage{}
            if err := proto.Unmarshal(d, &msg); err != nil {
                fn(err, " msg 处理异常")
                continue
            }
            out <- msg
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
// Msg2MsgSDK msg->msgsdk
func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
    d, err := proto.Marshal(&msg)
    if err != nil {
        return nil
    }
    index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
    if index >= count {
        return &sdkstruct.MsgSDK{
            MsgData:    d,
            SdkCount:   count,
            SdkIndex:   index,
            SdkDataLen: 0,
        }
    }
    return &sdkstruct.MsgSDK{
        MsgData:    d,
        SdkCount:   count,
        SdkIndex:   index,
        SdkDataLen: len(d),
    }
}
// EjectResult eject
func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
    if res == nil {
        s := Msg2MsgSDK(msg)
        if s == nil {
            return
        }
        out <- *s
        return
    }
    msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
    s := Msg2MsgSDK(msg)
    if s == nil {
        return
    }
    out <- *s
}
/////////////////////////////////////////////////////////////
// ValidRemoteMessage valid or not
func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
    if msg.Tasklab == nil {
        fn(fnName, " recieve msg nil")
        return false
    }
    sdkLen := len(msg.Tasklab.Sdkinfos)
    if sdkLen == 0 {
        fn(fnName, " has no sdk info")
        return false
    }
    curIndex := int(msg.Tasklab.Index)
    if curIndex < 0 || curIndex >= sdkLen {
        fn(fnName, " tasklab index ", curIndex, " error")
        return false
    }
    if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
        fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
        return false
    }
    return true
}
// UnpackImage unpack
func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
    // 反序列化数据得到sdk入参
    i := &protomsg.Image{}
    err := proto.Unmarshal(msg.Data, i)
    if err != nil {
        fn(fnName, " protobuf decode CameraImage error: ", err.Error())
        return nil
    }
    if i.Data == nil {
        fn(fnName, " protomsg.Image data null")
        return nil
    }
    return i
}
common/lockList.go
New file
@@ -0,0 +1,89 @@
package common
import (
    "container/list"
    "sync"
)
// LockList list
type LockList struct {
    cache *list.List
    cv    *sync.Cond
    cond  bool
    size  int
}
// NewLockList new
func NewLockList(size int) *LockList {
    return &LockList{
        cache: list.New(),
        cv:    sync.NewCond(&sync.Mutex{}),
        cond:  false,
        size:  size,
    }
}
// Push push
func (l *LockList) Push(v interface{}) {
    l.cv.L.Lock()
    l.cache.PushBack(v)
    for l.cache.Len() > l.size {
        l.cache.Remove(l.cache.Front())
    }
    l.cond = true
    l.cv.Signal()
    l.cv.L.Unlock()
}
// Pop pop
func (l *LockList) Pop() []interface{} {
    var batch []interface{}
    l.cv.L.Lock()
    for !l.cond {
        l.cv.Wait()
    }
    elem := l.cache.Front()
    if elem != nil {
        batch = append(batch, elem.Value)
        l.cache.Remove(l.cache.Front())
    }
    l.cond = false
    l.cv.L.Unlock()
    return batch
}
// Drain drain all element
func (l *LockList) Drain() []interface{} {
    var batch []interface{}
    l.cv.L.Lock()
    for !l.cond {
        l.cv.Wait()
    }
    for {
        elem := l.cache.Front()
        if elem == nil {
            break
        }
        batch = append(batch, elem.Value)
        l.cache.Remove(l.cache.Front())
    }
    l.cond = false
    l.cv.L.Unlock()
    return batch
}
common/recv.go
New file
@@ -0,0 +1,116 @@
package common
import (
    "context"
    "time"
    "basic.com/valib/deliver.git"
)
// Reciever recv from ipc
type Reciever struct {
    ctx    context.Context
    ipcURL string
    out    chan<- []byte
    shm      bool
    fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
    return &Reciever{
        ipcURL:   url,
        out:      out,
        shm:      shm,
        fnLogger: fn,
    }
}
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
    if r.shm {
        r.runShm(ctx)
    } else {
        r.run(ctx, deliver.NewClient(mode, r.ipcURL))
    }
}
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
    // t := time.Now()
    // sc := 0
    count := 0
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        default:
            if r.shm {
                if d, err := i.Recv(); err != nil {
                    i.Close()
                    r.fnLogger("ANALYSIS RECV ERROR: ", err)
                    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                        r.fnLogger("ANALYSIS CREATE FAILED : ", err)
                    }
                    i = c
                    r.fnLogger("ANALYSIS CREATE SHM")
                } else {
                    if d != nil {
                        count++
                        if count > 10 {
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        }
                        r.out <- d
                    }
                }
            } else {
                if msg, err := i.Recv(); err != nil {
                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
                } else {
                    count++
                    if count > 10 {
                        count = 0
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    r.out <- msg
                }
            }
            // sc++
            // if sc == 25 {
            //     logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
            //     sc = 0
            //     t = time.Now()
            // }
        }
    }
}
func (r *Reciever) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
        r.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    r.run(ctx, c)
}
common/send.go
New file
@@ -0,0 +1,141 @@
package common
import (
    "context"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/valib/deliver.git"
)
// Sender decoder ingo
type Sender struct {
    ipcURL string
    chMsg  <-chan sdkstruct.MsgSDK
    shm    bool
    fn     func([]byte, bool)
    fnLogger func(...interface{})
}
// ApplyCallbackFunc cb
func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
    if s.fn == nil {
        s.fn = f
    }
}
// NewSender Sender
func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
    // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
    return &Sender{
        ipcURL:   ipcURL,
        chMsg:    chMsg,
        shm:      shm,
        fn:       nil,
        fnLogger: fn,
    }
}
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
    if s.shm {
        s.runShm(ctx)
    } else {
        i := deliver.NewClient(mode, s.ipcURL)
        if i == nil {
            s.fnLogger("sender 2 pubsub nng create error")
            return
        }
        s.run(ctx, i)
    }
}
func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
    for {
        select {
        case <-ctx.Done():
            s.fnLogger("stop Sender")
            return
        case i := <-s.chMsg:
            data <- i.MsgData
            if int(i.SdkIndex+1) == i.SdkCount {
                if s.fn != nil {
                    sFlag := true
                    if i.SdkDataLen < 2 {
                        sFlag = false
                    }
                    s.fn(i.MsgData, sFlag)
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
    // go ruleserver.TimeTicker()
    dataChan := make(chan []byte, 3)
    go s.serializeProto(ctx, dataChan)
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        case d := <-dataChan:
            if s.shm {
                if err := i.Send(d); err != nil {
                    i.Close()
                    s.fnLogger("ANALYSIS SENDER ERROR: ", err)
                    c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
                        s.fnLogger("CLIENT CREATE FAILED : ", err)
                    }
                    i = c
                } else {
                }
            } else {
                err := i.Send(d)
                if err != nil {
                    // logo.Errorln("error sender 2 pubsub: ", err)
                } else {
                    s.fnLogger("mangos send to pubsub len: ", len(d))
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (s *Sender) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
        s.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    s.run(ctx, c)
}
common/torule.go
New file
@@ -0,0 +1,128 @@
package common
import (
    "container/list"
    "context"
    "sync"
    "time"
    "basic.com/valib/deliver.git"
    // "basic.com/pubsub/protomsg.git"
    // "github.com/gogo/protobuf/proto"
)
type runResult struct {
    data  []byte
    valid bool
}
// ToRule ipc
type ToRule struct {
    ipcURL   string
    maxSize  int
    cache    *list.List
    cv       *sync.Cond
    cond     bool
    fnLogger func(...interface{})
}
// NewToRule send to ruleprocess
func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
    return &ToRule{
        ipcURL:   ipcURL,
        maxSize:  maxSize,
        cache:    list.New(),
        cv:       sync.NewCond(&sync.Mutex{}),
        cond:     false,
        fnLogger: fn,
    }
}
// Push data
func (t *ToRule) Push(data []byte, valid bool) {
    t.cv.L.Lock()
    result := runResult{data, valid}
    t.cache.PushBack(result)
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            d := t.cache.Front().Value.(runResult)
            if d.valid == false {
                t.cache.Remove(t.cache.Front())
                i = i + 2
            } else {
                i = i + 1
            }
        }
    }
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            t.cache.Remove(t.cache.Front())
            i = i + 2
        }
    }
    // logo.Infof("push to cache count : %d\n", t.cache.Len())
    t.cond = true
    t.cv.Signal()
    t.cv.L.Unlock()
}
// Run forever
func (t *ToRule) Run(ctx context.Context) {
    var i deliver.Deliver
    var err error
    for {
        i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
        if err != nil {
            time.Sleep(time.Second)
            t.fnLogger("wait create to rule ipc", err)
            continue
        }
        break
    }
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            var d []byte
            t.cv.L.Lock()
            for !t.cond {
                t.cv.Wait()
            }
            for j := 0; j < 8; j++ {
                if t.cache.Len() <= 0 {
                    break
                }
                d = t.cache.Front().Value.(runResult).data
                if i != nil && d != nil {
                    err := i.Send(d)
                    if err != nil {
                        t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
                    } else {
                        count++
                        if count > 5 {
                            count = 0
                            t.fnLogger("~~~~~~SEND TO RULE CORRECT")
                        }
                    }
                }
                t.cache.Remove(t.cache.Front())
            }
            t.cond = false
            t.cv.L.Unlock()
        }
    }
}
go.mod
@@ -3,8 +3,7 @@
go 1.12
require (
    basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c // indirect
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
    basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573
    basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
    basic.com/valib/godraw.git v0.0.0-20191122082247-26e9987cd183
go.sum
@@ -1,5 +1,3 @@
basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b h1:CWneZPtnrpSv1SDYtB/D+kna3njXNWLExUfQ4xM3Lxc=
basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA=
run.go
@@ -10,7 +10,8 @@
    "time"
    "unsafe"
    "basic.com/libgowrapper/sdkhelper.git"
    "reid/common"
    "basic.com/valib/gogpu.git"
    "basic.com/pubsub/protomsg.git"
@@ -35,7 +36,7 @@
// Create Reid
func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
    cfg, err := sdkhelper.ReadConfig(config)
    cfg, err := common.ReadConfig(config)
    if err != nil {
        fn("Reid SDK Create Error When Read Config: ", err)
        return nil
@@ -55,7 +56,7 @@
        }
    }
    gpuM := sdkhelper.Atoi(cfg.Param[sGPU])
    gpuM := common.Atoi(cfg.Param[sGPU])
    rGPU := gpu