派生自 libgowrapper/face

zhangmeng
2020-01-15 d85f3edab0d8c495cecd7a81f31a9ead1eb001ac
copy from bgr-2-analysis
4个文件已修改
213 ■■■■■ 已修改文件
common/helper.go 95 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/recv.go 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/send.go 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
run.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/helper.go
@@ -1,20 +1,22 @@
package common
import (
    "context"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "strconv"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "github.com/gogo/protobuf/proto"
)
const mode = deliver.PushPull
// MsgRS msg recv and snd
type MsgRS struct {
    Msg protomsg.SdkMessage
}
// GetIpcAddress get ipc
func GetIpcAddress(shm bool, id string) string {
@@ -59,109 +61,56 @@
    return i
}
// UnserilizeProto un
func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            return
        case d := <-data:
            if len(d) < 100 {
                continue
            }
            msg := protomsg.SdkMessage{}
            if err := proto.Unmarshal(d, &msg); err != nil {
                fn(err, " msg 处理异常")
                continue
            }
            out <- msg
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
// Msg2MsgSDK msg->msgsdk
func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
    d, err := proto.Marshal(&msg)
    if err != nil {
        return nil
    }
    index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
    if index >= count {
        return &sdkstruct.MsgSDK{
            MsgData:    d,
            SdkCount:   count,
            SdkIndex:   index,
            SdkDataLen: 0,
        }
    }
    return &sdkstruct.MsgSDK{
        MsgData:    d,
        SdkCount:   count,
        SdkIndex:   index,
        SdkDataLen: len(d),
    }
}
// EjectResult eject
func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
func EjectResult(res []byte, msg MsgRS, out chan<- MsgRS) {
    if res == nil {
        s := Msg2MsgSDK(msg)
        if s == nil {
            return
        }
        out <- *s
        out <- msg
        return
    }
    index := int(msg.Msg.Tasklab.Index)
    if index >= len(msg.Msg.Tasklab.Sdkinfos) {
        return
    }
    msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
    msg.Msg.Tasklab.Sdkinfos[index].Sdkdata = res
    s := Msg2MsgSDK(msg)
    if s == nil {
        return
    }
    out <- *s
    out <- msg
}
/////////////////////////////////////////////////////////////
// ValidRemoteMessage valid or not
func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
    if msg.Tasklab == nil {
func ValidRemoteMessage(msg MsgRS, fnName string, fn func(...interface{})) bool {
    if msg.Msg.Tasklab == nil {
        fn(fnName, " recieve msg nil")
        return false
    }
    sdkLen := len(msg.Tasklab.Sdkinfos)
    sdkLen := len(msg.Msg.Tasklab.Sdkinfos)
    if sdkLen == 0 {
        fn(fnName, " has no sdk info")
        return false
    }
    curIndex := int(msg.Tasklab.Index)
    curIndex := int(msg.Msg.Tasklab.Index)
    if curIndex < 0 || curIndex >= sdkLen {
        fn(fnName, " tasklab index ", curIndex, " error")
        return false
    }
    if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
        fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
    if msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
        fn(fnName, " is different from ", msg.Msg.Tasklab.Sdkinfos[curIndex].Sdktype)
        return false
    }
    return true
}
// UnpackImage unpack
func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
func UnpackImage(msg MsgRS, fnName string, fn func(...interface{})) *protomsg.Image {
    // 反序列化数据得到sdk入参
    i := &protomsg.Image{}
    err := proto.Unmarshal(msg.Data, i)
    err := proto.Unmarshal(msg.Msg.Data, i)
    if err != nil {
        fn(fnName, " protobuf decode CameraImage error: ", err.Error())
        return nil
common/recv.go
@@ -5,26 +5,50 @@
    "time"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "github.com/gogo/protobuf/proto"
)
// Reciever recv from ipc
type Reciever struct {
    ctx    context.Context
    ipcURL string
    out    chan<- []byte
    chMsg  chan<- MsgRS
    shm      bool
    fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever {
    return &Reciever{
        ipcURL:   url,
        out:      out,
        chMsg:    chMsg,
        shm:      shm,
        fnLogger: fn,
    }
}
func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            d := <-data
            if len(d) < 100 {
                continue
            }
            // logo.Infoln(len(d), "reciver数据")
            msg := protomsg.SdkMessage{}
            if err := proto.Unmarshal(d, &msg); err != nil {
                r.fnLogger(err, " msg 处理异常")
                continue
            }
            outMsg := MsgRS{Msg: msg}
            r.chMsg <- outMsg
        }
    }
}
@@ -40,8 +64,9 @@
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
    // t := time.Now()
    // sc := 0
    dataChan := make(chan []byte, 3)
    go r.unserilizeProto(ctx, dataChan)
    count := 0
@@ -75,7 +100,7 @@
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        }
                        r.out <- d
                        dataChan <- d
                    }
                }
            } else {
@@ -87,16 +112,9 @@
                        count = 0
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    r.out <- msg
                    dataChan <- msg
                }
            }
            // sc++
            // if sc == 25 {
            //     logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
            //     sc = 0
            //     t = time.Now()
            // }
        }
    }
common/send.go
@@ -4,14 +4,14 @@
    "context"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/valib/deliver.git"
    "github.com/gogo/protobuf/proto"
)
// Sender decoder ingo
type Sender struct {
    ipcURL string
    chMsg  <-chan sdkstruct.MsgSDK
    chMsg  <-chan MsgRS
    shm    bool
    fn     func([]byte, bool)
@@ -27,7 +27,7 @@
}
// NewSender Sender
func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool, fn func(...interface{})) *Sender {
    // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
    return &Sender{
        ipcURL:   ipcURL,
@@ -62,19 +62,28 @@
            return
        case i := <-s.chMsg:
            data <- i.MsgData
            d, err := proto.Marshal(&i.Msg)
            if int(i.SdkIndex+1) == i.SdkCount {
            if err != nil {
                s.fnLogger("protobuf encode ipc sender error: ", err)
                continue
            }
            data <- d
            if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) {
                if s.fn != nil {
                    sFlag := true
                    if i.SdkDataLen < 2 {
                        sFlag = false
                    for _, v := range i.Msg.Tasklab.Sdkinfos {
                        if len(v.Sdkdata) < 2 {
                            sFlag = false
                            break
                        }
                    }
                    s.fn(i.MsgData, sFlag)
                    s.fn(d, sFlag)
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
@@ -82,8 +91,6 @@
}
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
    // go ruleserver.TimeTicker()
    dataChan := make(chan []byte, 3)
    go s.serializeProto(ctx, dataChan)
run.go
@@ -8,8 +8,6 @@
    "face/common"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/gogpu.git"
)
@@ -19,7 +17,7 @@
    handle *SDKFace
    maxChannel      int
    ftrackChans     map[string]chan protomsg.SdkMessage
    ftrackChans     map[string]chan common.MsgRS
    ftrackChannels  map[string]int
    ftrackChanStats []bool
    chnLock         sync.Mutex
@@ -139,7 +137,7 @@
        handle: handle,
        maxChannel:      maxChan,
        ftrackChans:     make(map[string]chan protomsg.SdkMessage, maxChan),
        ftrackChans:     make(map[string]chan common.MsgRS, maxChan),
        ftrackChannels:  make(map[string]int, maxChan),
        ftrackChanStats: make([]bool, maxChan, maxChan),
@@ -173,8 +171,8 @@
    )
    ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
    ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
    chRcv := make(chan []byte, s.maxChannel)
    chSnd := make(chan sdkstruct.MsgSDK, s.maxChannel)
    chRcv := make(chan common.MsgRS, s.maxChannel)
    chSnd := make(chan common.MsgRS, s.maxChannel)
    rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
    snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
@@ -223,36 +221,33 @@
    f.chnLock.Unlock()
}
func (f *face) run(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK) {
    chMsg := make(chan protomsg.SdkMessage, f.maxChannel)
    go common.UnserilizeProto(ctx, in, chMsg, f.fnLogger)
func (f *face) run(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS) {
    for {
        select {
        case <-ctx.Done():
            f.handle.Free()
            return
        case rMsg := <-chMsg:
        case rMsg := <-in:
            if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
                f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
                common.EjectResult(nil, rMsg, out)
                continue
            }
            if _, ok := f.ftrackChans[rMsg.Cid]; ok {
            if _, ok := f.ftrackChans[rMsg.Msg.Cid]; ok {
                f.fnLogger("Face Cache Size: ", len(f.ftrackChans))
                f.ftrackChans[rMsg.Cid] <- rMsg
                f.ftrackChans[rMsg.Msg.Cid] <- rMsg
            } else {
                f.ftrackChans[rMsg.Cid] = make(chan protomsg.SdkMessage, f.maxChannel)
                f.ftrackChans[rMsg.Msg.Cid] = make(chan common.MsgRS, f.maxChannel)
                chn := f.getAvailableChn()
                if chn < 0 {
                    f.fnLogger("TOO MUCH CHANNEL")
                    common.EjectResult(nil, rMsg, out)
                    continue
                }
                f.ftrackChannels[rMsg.Cid] = chn
                f.ftrackChannels[rMsg.Msg.Cid] = chn
                i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
                if i == nil {
@@ -262,9 +257,9 @@
                // conv to bgr24 and resize
                imgW, imgH := int(i.Width), int(i.Height)
                ret := f.handle.TrackerResize(imgW, imgH, chn)
                f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
                go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Cid], out, chn)
                f.ftrackChans[rMsg.Cid] <- rMsg
                f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
                go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Msg.Cid], out, chn)
                f.ftrackChans[rMsg.Msg.Cid] <- rMsg
            }
        default:
            time.Sleep(time.Millisecond * 100)
@@ -272,7 +267,7 @@
    }
}
func (f *face) detectTrackOneChn(ctx context.Context, in <-chan protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, dtchn int) {
func (f *face) detectTrackOneChn(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS, dtchn int) {
    tm := time.Now()
    sc := 0
    f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn)
@@ -301,7 +296,7 @@
            // conv to bgr24 and resize
            imgW, imgH := int(i.Width), int(i.Height)
            f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Cid)
            f.fnLogger("Face Start Run:", dtchn, "CAMERAID: ", rMsg.Msg.Cid)
            count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
@@ -311,10 +306,10 @@
            f.mtxRunning.Unlock()
            var id, name string
            if rMsg.Tasklab != nil {
                id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
            if rMsg.Msg.Tasklab != nil {
                id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname
            }
            f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
            f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
            sc++
            if sc == 25 {