package main
|
|
import (
|
"context"
|
"os"
|
"sync"
|
"time"
|
"unsafe"
|
|
"face/common"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/gogpu.git"
|
"github.com/gogo/protobuf/proto"
|
)
|
|
const maxTryBeforeReboot = 10
|
|
type face struct {
|
handle *SDKFace
|
|
maxChannel int
|
ftrackChans map[string]chan common.MsgRS
|
ftrackChannels map[string]int
|
ftrackChanStats []bool
|
chnLock sync.Mutex
|
|
fnLogger func(...interface{})
|
|
typ string
|
id string
|
shm bool
|
ipc2Rule string
|
ruleMsgMaxCacheSize int
|
reserved map[string]interface{}
|
|
running bool
|
rebootUntil int
|
mtxRunning sync.Mutex
|
}
|
|
func (f *face) maybeReboot(ctx context.Context) {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
f.mtxRunning.Lock()
|
running := f.running
|
f.mtxRunning.Unlock()
|
|
if running {
|
f.rebootUntil = 0
|
|
f.mtxRunning.Lock()
|
f.running = false
|
f.mtxRunning.Unlock()
|
|
} else {
|
f.rebootUntil++
|
f.fnLogger("Face No Running: ", f.rebootUntil)
|
if f.rebootUntil > maxTryBeforeReboot {
|
f.fnLogger("Face Too Long Running, Reboot")
|
os.Exit(0)
|
}
|
}
|
time.Sleep(time.Second)
|
}
|
}
|
}
|
|
// Create create sdk
|
func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
|
|
handle := NewSDK(fn)
|
if handle == nil {
|
fn("Face SDK Create Error When New SDK")
|
return nil
|
}
|
cfg, err := common.ReadConfig(config)
|
if err != nil {
|
fn("Face SDK Create Error When Read Config: ", err)
|
return nil
|
}
|
|
// 此处与传入的配置文件param必须一一对应,否则出错
|
dt, dn, da, pt, et, ti, ts, mc, gm :=
|
"detect_thread",
|
"detect_num",
|
"detect_angle",
|
"property_thread",
|
"extract_thread",
|
"track_interval",
|
"track_sample",
|
"max_channel",
|
"gpu-memory"
|
|
params := []string{dt, dn, da, pt, et, ti, ts, mc, gm}
|
|
for _, v := range params {
|
if _, ok := cfg.Param[v]; !ok {
|
fn("Face SDK Create Error Because of Param Not Found: ", v)
|
return nil
|
}
|
}
|
|
w, h, detThrd, detNum, detAngle, propThrd, extThrd, trckInterval, trckSmpl, maxChan, gpuM :=
|
1280, 720, common.Atoi(cfg.Param[dt]), common.Atoi(cfg.Param[dn]), common.Atoi(cfg.Param[da]),
|
common.Atoi(cfg.Param[pt]), common.Atoi(cfg.Param[et]), common.Atoi(cfg.Param[ti]), common.Atoi(cfg.Param[ts]),
|
common.Atoi(cfg.Param[mc]), common.Atoi(cfg.Param[gm])
|
|
if detAngle > 0 {
|
}
|
|
rGPU := gpu
|
|
if rGPU == -1 {
|
rGPU = gogpu.ValidGPU(gpuM + 512)
|
}
|
if rGPU == -1 {
|
fn("Face SDK Create Error When Find GPU")
|
return nil
|
}
|
|
if !handle.Tracker(w, h, detNum, trckInterval, trckSmpl, detThrd, rGPU) {
|
fn("Face SDK Create Error When Init Tracker")
|
return nil
|
}
|
if !handle.Extractor(extThrd, rGPU) {
|
fn("Face SDK Create Error When Init Extractor")
|
return nil
|
}
|
|
if !handle.Propertizer(propThrd) {
|
fn("Face SDK Create Error When Init Propertizer")
|
return nil
|
}
|
|
return &face{
|
handle: handle,
|
|
maxChannel: maxChan,
|
ftrackChans: make(map[string]chan common.MsgRS, maxChan),
|
ftrackChannels: make(map[string]int, maxChan),
|
ftrackChanStats: make([]bool, maxChan, maxChan),
|
|
fnLogger: fn,
|
|
typ: typ,
|
id: id,
|
shm: shm,
|
ipc2Rule: ipc2Rule,
|
ruleMsgMaxCacheSize: ruleMaxSize,
|
reserved: reserved,
|
|
running: true,
|
rebootUntil: maxTryBeforeReboot,
|
}
|
}
|
|
func (f *face) release() {
|
if f.handle != nil {
|
f.handle.Free()
|
}
|
}
|
|
// Run run
|
func Run(ctx context.Context, i interface{}) {
|
s := i.(*face)
|
|
const (
|
postPull = `_1`
|
postPush = `_2`
|
)
|
ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
|
ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
|
chRcv := make(chan common.MsgRS, s.maxChannel)
|
chSnd := make(chan common.MsgRS, s.maxChannel)
|
|
rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
|
snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
|
torule := common.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
|
|
snder.ApplyCallbackFunc(torule.Push)
|
|
go rcver.Run(ctx)
|
go snder.Run(ctx)
|
go torule.Run(ctx)
|
|
go s.run(ctx, chRcv, chSnd)
|
|
go s.maybeReboot(ctx)
|
}
|
|
//////////////////////////////////////////////////////////////////
|
const (
|
trackChnTimeout = time.Duration(10)
|
)
|
|
func (f *face) cleanChnStat() {
|
f.chnLock.Lock()
|
for i := 0; i < f.maxChannel; i++ {
|
f.ftrackChanStats[i] = false
|
}
|
f.chnLock.Unlock()
|
}
|
|
func (f *face) getAvailableChn() int {
|
f.chnLock.Lock()
|
defer f.chnLock.Unlock()
|
|
for i := 0; i < f.maxChannel; i++ {
|
if f.ftrackChanStats[i] == false {
|
f.ftrackChanStats[i] = true
|
return i
|
}
|
}
|
return -1
|
}
|
|
func (f *face) releaseChn(chn int) {
|
f.chnLock.Lock()
|
f.ftrackChanStats[chn] = false
|
f.chnLock.Unlock()
|
}
|
|
func (f *face) run(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS) {
|
|
for {
|
select {
|
case <-ctx.Done():
|
f.handle.Free()
|
return
|
case rMsg := <-in:
|
if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
|
f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
|
common.EjectResult(nil, rMsg, out)
|
continue
|
}
|
|
if _, ok := f.ftrackChans[rMsg.Msg.Cid]; ok {
|
f.fnLogger("Face Cache Size: ", len(f.ftrackChans))
|
f.ftrackChans[rMsg.Msg.Cid] <- rMsg
|
} else {
|
|
f.ftrackChans[rMsg.Msg.Cid] = make(chan common.MsgRS, f.maxChannel)
|
chn := f.getAvailableChn()
|
if chn < 0 {
|
f.fnLogger("TOO MUCH CHANNEL")
|
common.EjectResult(nil, rMsg, out)
|
continue
|
}
|
f.ftrackChannels[rMsg.Msg.Cid] = chn
|
|
i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
|
if i == nil {
|
common.EjectResult(nil, rMsg, out)
|
continue
|
}
|
// conv to bgr24 and resize
|
imgW, imgH := int(i.Width), int(i.Height)
|
ret := f.handle.TrackerResize(imgW, imgH, chn)
|
f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
|
go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Msg.Cid], out, chn)
|
f.ftrackChans[rMsg.Msg.Cid] <- rMsg
|
}
|
default:
|
time.Sleep(time.Millisecond * 100)
|
}
|
}
|
}
|
|
func (f *face) detectTrackOneChn(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS, dtchn int) {
|
tm := time.Now()
|
sc := 0
|
f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn)
|
var curCid string
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
|
case rMsg := <-in:
|
|
if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
|
common.EjectResult(nil, rMsg, out)
|
continue
|
}
|
|
i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
|
if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
|
common.EjectResult(nil, rMsg, out)
|
continue
|
}
|
|
curCid = i.Cid
|
|
// conv to bgr24 and resize
|
imgW, imgH := int(i.Width), int(i.Height)
|
|
// f.fnLogger("Face~~~Track", dtchn)
|
detect := f.handle.Track(i.Data, imgW, imgH, 3, dtchn)
|
// f.fnLogger("Face~~~Track Over", dtchn)
|
|
var faces []*protomsg.ResultFaceDetect
|
|
//将sdk返回值转换成protomsg类型
|
for _, d := range detect {
|
|
//运行sd
|
// f.fnLogger("Face~~~FaceInfo2FacePos", dtchn)
|
dec := FaceInfo2FacePos(d)
|
// f.fnLogger("Face~~~FaceInfo2FacePos over", dtchn)
|
|
// f.fnLogger("Face~~~Propertize", dtchn)
|
prop := f.handle.Propertize(dec, i.Data, imgW, imgH, 3, dtchn)
|
// f.fnLogger("Face~~~Propertize Over", dtchn)
|
|
f.fnLogger("Face~~~Extract", dtchn)
|
feat := f.handle.Extract(dec, i.Data, imgW, imgH, 3, dtchn)
|
f.fnLogger("Face~~~Extract Over", dtchn)
|
|
// f.fnLogger("Face~~~protomsg.ThftResult", dtchn)
|
resP := (*protomsg.ThftResult)(unsafe.Pointer(&prop))
|
// f.fnLogger("Face~~~protomsg.ThftResult Over", dtchn)
|
|
// f.fnLogger("Face~~~tconvert2ProtoFacePos", dtchn)
|
result := tconvert2ProtoFacePos(d)
|
// f.fnLogger("Face~~~tconvert2ProtoFacePos Over", dtchn)
|
|
//组成结果并序列化
|
res := &protomsg.ResultFaceDetect{Pos: result, Result: resP, Feats: feat}
|
faces = append(faces, res)
|
|
}
|
|
var err error
|
var data []byte
|
if len(faces) > 0 {
|
|
// logo.Infoln("CID: ", rMsg.Msg.Cid, " TASK: ", rMsg.Msg.Tasklab.Taskid, " FACE TRACK OBJS: ", len(faces))
|
|
facePos := protomsg.ParamFacePos{Faces: faces}
|
data, err = proto.Marshal(&facePos)
|
if err != nil {
|
f.fnLogger("fdetect marshal proto face pos error", err)
|
data = nil
|
}
|
}
|
|
// f.fnLogger("Face~~~EjectResult", dtchn)
|
common.EjectResult(data, rMsg, out)
|
// f.fnLogger("Face~~~EjectResult Over", dtchn)
|
f.mtxRunning.Lock()
|
f.running = true
|
f.mtxRunning.Unlock()
|
|
var id, name string
|
if rMsg.Msg.Tasklab != nil {
|
id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname
|
}
|
|
f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", len(faces))
|
|
sc++
|
if sc == 25 {
|
f.fnLogger("CHAN:%d, FACE RUN 25 FRAME USE TIME: ", dtchn, time.Since(tm))
|
sc = 0
|
tm = time.Now()
|
}
|
|
if time.Since(tm) > time.Second {
|
f.fnLogger("CHAN: ", dtchn, " FACE RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
|
sc = 0
|
tm = time.Now()
|
}
|
case <-time.After(trackChnTimeout * time.Second):
|
f.fnLogger("Timeout to get image, curCid:", curCid)
|
if curCid != "" {
|
delete(f.ftrackChans, curCid)
|
f.releaseChn(dtchn)
|
}
|
return
|
|
}
|
}
|
}
|