派生自 libgowrapper/face

zhangmeng
2020-01-14 db20b766e2020950ed64a89065c8599a2ad69ff2
update
4个文件已添加
3个文件已修改
606 ■■■■■ 已修改文件
common/helper.go 174 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/recv.go 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/send.go 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/torule.go 128 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
run.go 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/helper.go
New file
@@ -0,0 +1,174 @@
package common
import (
    "context"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "strconv"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "github.com/gogo/protobuf/proto"
)
const mode = deliver.PushPull
// GetIpcAddress get ipc
func GetIpcAddress(shm bool, id string) string {
    if shm {
        return id
    }
    return `ipc:///tmp/` + id + `.ipc`
}
// SubConfig sub
type SubConfig struct {
    SoFile string            `json:"so_file_path"`
    Env    string            `json:"runtime"`
    Param  map[string]string `json:"param"`
}
// SdkConfig sdk
type SdkConfig struct {
    SoFile string            `json:"so_file_path"`
    Env    string            `json:"runtime"`
    Param  map[string]string `json:"param"`
    Sub    *SubConfig        `json:"sub"`
}
// ReadConfig conf
func ReadConfig(file string) (SdkConfig, error) {
    data, err := ioutil.ReadFile(file)
    if err != nil {
        return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
    }
    //读取的数据为json格式,需要进行解码
    var v SdkConfig
    err = json.Unmarshal(data, &v)
    return v, err
}
// Atoi atoi
func Atoi(s string) int {
    i, _ := strconv.Atoi(s)
    return i
}
// UnserilizeProto un
func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            return
        case d := <-data:
            if len(d) < 100 {
                continue
            }
            msg := protomsg.SdkMessage{}
            if err := proto.Unmarshal(d, &msg); err != nil {
                fn(err, " msg 处理异常")
                continue
            }
            out <- msg
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
// Msg2MsgSDK msg->msgsdk
func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
    d, err := proto.Marshal(&msg)
    if err != nil {
        return nil
    }
    index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
    if index >= count {
        return &sdkstruct.MsgSDK{
            MsgData:    d,
            SdkCount:   count,
            SdkIndex:   index,
            SdkDataLen: 0,
        }
    }
    return &sdkstruct.MsgSDK{
        MsgData:    d,
        SdkCount:   count,
        SdkIndex:   index,
        SdkDataLen: len(d),
    }
}
// EjectResult eject
func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
    if res == nil {
        s := Msg2MsgSDK(msg)
        if s == nil {
            return
        }
        out <- *s
        return
    }
    msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
    s := Msg2MsgSDK(msg)
    if s == nil {
        return
    }
    out <- *s
}
/////////////////////////////////////////////////////////////
// ValidRemoteMessage valid or not
func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
    if msg.Tasklab == nil {
        fn(fnName, " recieve msg nil")
        return false
    }
    sdkLen := len(msg.Tasklab.Sdkinfos)
    if sdkLen == 0 {
        fn(fnName, " has no sdk info")
        return false
    }
    curIndex := int(msg.Tasklab.Index)
    if curIndex < 0 || curIndex >= sdkLen {
        fn(fnName, " tasklab index ", curIndex, " error")
        return false
    }
    if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
        fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
        return false
    }
    return true
}
// UnpackImage unpack
func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
    // 反序列化数据得到sdk入参
    i := &protomsg.Image{}
    err := proto.Unmarshal(msg.Data, i)
    if err != nil {
        fn(fnName, " protobuf decode CameraImage error: ", err.Error())
        return nil
    }
    if i.Data == nil {
        fn(fnName, " protomsg.Image data null")
        return nil
    }
    return i
}
common/recv.go
New file
@@ -0,0 +1,116 @@
package common
import (
    "context"
    "time"
    "basic.com/valib/deliver.git"
)
// Reciever recv from ipc
type Reciever struct {
    ctx    context.Context
    ipcURL string
    out    chan<- []byte
    shm      bool
    fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
    return &Reciever{
        ipcURL:   url,
        out:      out,
        shm:      shm,
        fnLogger: fn,
    }
}
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
    if r.shm {
        r.runShm(ctx)
    } else {
        r.run(ctx, deliver.NewClient(mode, r.ipcURL))
    }
}
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
    // t := time.Now()
    // sc := 0
    count := 0
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        default:
            if r.shm {
                if d, err := i.Recv(); err != nil {
                    i.Close()
                    r.fnLogger("ANALYSIS RECV ERROR: ", err)
                    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                        r.fnLogger("ANALYSIS CREATE FAILED : ", err)
                    }
                    i = c
                    r.fnLogger("ANALYSIS CREATE SHM")
                } else {
                    if d != nil {
                        count++
                        if count > 10 {
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        }
                        r.out <- d
                    }
                }
            } else {
                if msg, err := i.Recv(); err != nil {
                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
                } else {
                    count++
                    if count > 10 {
                        count = 0
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    r.out <- msg
                }
            }
            // sc++
            // if sc == 25 {
            //     logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
            //     sc = 0
            //     t = time.Now()
            // }
        }
    }
}
func (r *Reciever) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
        r.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    r.run(ctx, c)
}
common/send.go
New file
@@ -0,0 +1,141 @@
package common
import (
    "context"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/valib/deliver.git"
)
// Sender decoder ingo
type Sender struct {
    ipcURL string
    chMsg  <-chan sdkstruct.MsgSDK
    shm    bool
    fn     func([]byte, bool)
    fnLogger func(...interface{})
}
// ApplyCallbackFunc cb
func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
    if s.fn == nil {
        s.fn = f
    }
}
// NewSender Sender
func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
    // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
    return &Sender{
        ipcURL:   ipcURL,
        chMsg:    chMsg,
        shm:      shm,
        fn:       nil,
        fnLogger: fn,
    }
}
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
    if s.shm {
        s.runShm(ctx)
    } else {
        i := deliver.NewClient(mode, s.ipcURL)
        if i == nil {
            s.fnLogger("sender 2 pubsub nng create error")
            return
        }
        s.run(ctx, i)
    }
}
func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
    for {
        select {
        case <-ctx.Done():
            s.fnLogger("stop Sender")
            return
        case i := <-s.chMsg:
            data <- i.MsgData
            if int(i.SdkIndex+1) == i.SdkCount {
                if s.fn != nil {
                    sFlag := true
                    if i.SdkDataLen < 2 {
                        sFlag = false
                    }
                    s.fn(i.MsgData, sFlag)
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
    // go ruleserver.TimeTicker()
    dataChan := make(chan []byte, 3)
    go s.serializeProto(ctx, dataChan)
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        case d := <-dataChan:
            if s.shm {
                if err := i.Send(d); err != nil {
                    i.Close()
                    s.fnLogger("ANALYSIS SENDER ERROR: ", err)
                    c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
                        s.fnLogger("CLIENT CREATE FAILED : ", err)
                    }
                    i = c
                } else {
                }
            } else {
                err := i.Send(d)
                if err != nil {
                    // logo.Errorln("error sender 2 pubsub: ", err)
                } else {
                    s.fnLogger("mangos send to pubsub len: ", len(d))
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (s *Sender) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
        s.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    s.run(ctx, c)
}
common/torule.go
New file
@@ -0,0 +1,128 @@
package common
import (
    "container/list"
    "context"
    "sync"
    "time"
    "basic.com/valib/deliver.git"
    // "basic.com/pubsub/protomsg.git"
    // "github.com/gogo/protobuf/proto"
)
type runResult struct {
    data  []byte
    valid bool
}
// ToRule ipc
type ToRule struct {
    ipcURL   string
    maxSize  int
    cache    *list.List
    cv       *sync.Cond
    cond     bool
    fnLogger func(...interface{})
}
// NewToRule send to ruleprocess
func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
    return &ToRule{
        ipcURL:   ipcURL,
        maxSize:  maxSize,
        cache:    list.New(),
        cv:       sync.NewCond(&sync.Mutex{}),
        cond:     false,
        fnLogger: fn,
    }
}
// Push data
func (t *ToRule) Push(data []byte, valid bool) {
    t.cv.L.Lock()
    result := runResult{data, valid}
    t.cache.PushBack(result)
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            d := t.cache.Front().Value.(runResult)
            if d.valid == false {
                t.cache.Remove(t.cache.Front())
                i = i + 2
            } else {
                i = i + 1
            }
        }
    }
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            t.cache.Remove(t.cache.Front())
            i = i + 2
        }
    }
    // logo.Infof("push to cache count : %d\n", t.cache.Len())
    t.cond = true
    t.cv.Signal()
    t.cv.L.Unlock()
}
// Run forever
func (t *ToRule) Run(ctx context.Context) {
    var i deliver.Deliver
    var err error
    for {
        i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
        if err != nil {
            time.Sleep(time.Second)
            t.fnLogger("wait create to rule ipc", err)
            continue
        }
        break
    }
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            var d []byte
            t.cv.L.Lock()
            for !t.cond {
                t.cv.Wait()
            }
            for j := 0; j < 8; j++ {
                if t.cache.Len() <= 0 {
                    break
                }
                d = t.cache.Front().Value.(runResult).data
                if i != nil && d != nil {
                    err := i.Send(d)
                    if err != nil {
                        t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
                    } else {
                        count++
                        if count > 5 {
                            count = 0
                            t.fnLogger("~~~~~~SEND TO RULE CORRECT")
                        }
                    }
                }
                t.cache.Remove(t.cache.Front())
            }
            t.cond = false
            t.cv.L.Unlock()
        }
    }
}
go.mod
@@ -3,10 +3,9 @@
go 1.12
require (
    basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
    basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0
    basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3 // indirect
    basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
    basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865
    basic.com/valib/shm.git v0.0.0-20191029034255-156e610f9bca // indirect
    github.com/gogo/protobuf v1.3.1
go.sum
@@ -1,5 +1,3 @@
basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b h1:CWneZPtnrpSv1SDYtB/D+kna3njXNWLExUfQ4xM3Lxc=
basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0 h1:5NOX81GTsRLwbhnHWYU4g6jfcOynSWetmf9PlhK5eLI=
run.go
@@ -6,7 +6,7 @@
    "sync"
    "time"
    "basic.com/libgowrapper/sdkhelper.git"
    "face/common"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/pubsub/protomsg.git"
@@ -76,7 +76,7 @@
        fn("Face SDK Create Error When New SDK")
        return nil
    }
    cfg, err := sdkhelper.ReadConfig(config)
    cfg, err := common.ReadConfig(config)
    if err != nil {
        fn("Face SDK Create Error When Read Config: ", err)
        return nil
@@ -104,9 +104,9 @@
    }
    w, h, detThrd, detNum, detAngle, propThrd, extThrd, trckInterval, trckSmpl, maxChan, gpuM :=
        1280, 720, sdkhelper.Atoi(cfg.Param[dt]), sdkhelper.Atoi(cfg.Param[dn]), sdkhelper.Atoi(cfg.Param[da]),
        sdkhelper.Atoi(cfg.Param[pt]), sdkhelper.Atoi(cfg.Param[et]), sdkhelper.Atoi(cfg.Param[ti]), sdkhelper.Atoi(cfg.Param[ts]),
        sdkhelper.Atoi(cfg.Param[mc]), sdkhelper.Atoi(cfg.Param[gm])
        1280, 720, common.Atoi(cfg.Param[dt]), common.Atoi(cfg.Param[dn]), common.Atoi(cfg.Param[da]),
        common.Atoi(cfg.Param[pt]), common.Atoi(cfg.Param[et]), common.Atoi(cfg.Param[ti]), common.Atoi(cfg.Param[ts]),
        common.Atoi(cfg.Param[mc]), common.Atoi(cfg.Param[gm])
    if detAngle > 0 {
    }
@@ -171,14 +171,14 @@
        postPull = `_1`
        postPush = `_2`
    )
    ipcRcv := sdkhelper.GetIpcAddress(s.shm, s.id+postPull)
    ipcSnd := sdkhelper.GetIpcAddress(s.shm, s.id+postPush)
    ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
    ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
    chRcv := make(chan []byte, s.maxChannel)
    chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel)
    rcver := sdkhelper.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
    snder := sdkhelper.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
    torule := sdkhelper.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
    rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
    snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
    torule := common.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
    snder.ApplyCallbackFunc(torule.Push)
@@ -226,7 +226,7 @@
func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) {
    chMsg := make(chan protomsg.SdkMessage, f.maxChannel)
    go sdkhelper.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
    go common.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
    for {
        select {
@@ -234,9 +234,9 @@
            f.handle.Free()
            return
        case rMsg := <-chMsg:
            if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
            if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
                f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
                sdkhelper.EjectResult(nil, rMsg, out)
                common.EjectResult(nil, rMsg, out)
                continue
            }
@@ -249,14 +249,14 @@
                chn := f.getAvailableChn()
                if chn < 0 {
                    f.fnLogger("TOO MUCH CHANNEL")
                    sdkhelper.EjectResult(nil, rMsg, out)
                    common.EjectResult(nil, rMsg, out)
                    continue
                }
                f.ftrackChannels[rMsg.Cid] = chn
                i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
                i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
                if i == nil {
                    sdkhelper.EjectResult(nil, rMsg, out)
                    common.EjectResult(nil, rMsg, out)
                    continue
                }
                // conv to bgr24 and resize
@@ -285,14 +285,14 @@
        case rMsg := <-in:
            if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
                sdkhelper.EjectResult(nil, rMsg, out)
            if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
                common.EjectResult(nil, rMsg, out)
                continue
            }
            i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
            i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
            if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
                sdkhelper.EjectResult(nil, rMsg, out)
                common.EjectResult(nil, rMsg, out)
                continue
            }
@@ -305,7 +305,7 @@
            count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
            sdkhelper.EjectResult(data, rMsg, out)
            common.EjectResult(data, rMsg, out)
            f.mtxRunning.Lock()
            f.running = true
            f.mtxRunning.Unlock()