package main
|
|
import (
|
"context"
|
"sync"
|
"time"
|
|
"basic.com/libgowrapper/sdkhelper.git"
|
|
"basic.com/libgowrapper/sdkstruct.git"
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/gogpu.git"
|
)
|
|
type channel struct {
|
index int
|
life int
|
}
|
|
func (f *face) channelAvailable() int {
|
f.mapChanLock.Lock()
|
defer f.mapChanLock.Unlock()
|
|
for i := 0; i < f.maxChannel; i++ {
|
found := false
|
for _, v := range f.mapChan {
|
if v.index == i {
|
found = true
|
break
|
}
|
}
|
if !found {
|
return i
|
}
|
}
|
return -1
|
}
|
|
func (f *face) channelExist(id string) (*channel, bool) {
|
f.mapChanLock.Lock()
|
defer f.mapChanLock.Unlock()
|
|
c, ok := f.mapChan[id]
|
return c, ok
|
}
|
|
func (f *face) channelNew(id string) *channel {
|
|
index := f.channelAvailable()
|
if index < 0 {
|
return nil
|
}
|
|
f.mapChanLock.Lock()
|
c := &channel{
|
index: index,
|
life: f.maxLife,
|
}
|
f.mapChan[id] = c
|
f.mapChanLock.Unlock()
|
|
return c
|
}
|
|
func (f *face) maintainChannel(ctx context.Context, ch <-chan string) {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
case id := <-ch:
|
f.mapChanLock.Lock()
|
for k, v := range f.mapChan {
|
v.life--
|
if k == id {
|
v.life = f.maxLife
|
}
|
}
|
for k, v := range f.mapChan {
|
if v.life < 0 {
|
delete(f.mapChan, k)
|
}
|
}
|
f.mapChanLock.Unlock()
|
|
default:
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}
|
|
type face struct {
|
handle *SDKFace
|
list *sdkhelper.LockList
|
|
maxChannel int
|
|
maxLife int
|
chLife chan string
|
mapChan map[string]*channel
|
mapChanLock sync.Mutex
|
|
fnLogger func(...interface{})
|
|
typ string
|
id string
|
shm bool
|
ipc2Rule string
|
ruleMsgMaxCacheSize int
|
reserved map[string]interface{}
|
}
|
|
// Create create sdk
|
func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
|
|
handle := NewSDK(fn)
|
if handle == nil {
|
fn("Face SDK Create Error When New SDK")
|
return nil
|
}
|
cfg, err := sdkhelper.ReadConfig(config)
|
if err != nil {
|
fn("Face SDK Create Error When Read Config: ", err)
|
return nil
|
}
|
|
// 此处与传入的配置文件param必须一一对应,否则出错
|
dt, dn, da, pt, et, ti, ts, mc, gm :=
|
"detect_thread",
|
"detect_num",
|
"detect_angle",
|
"property_thread",
|
"extract_thread",
|
"track_interval",
|
"track_sample",
|
"max_channel",
|
"gpu-memory"
|
|
params := []string{dt, dn, da, pt, et, ti, ts, mc, gm}
|
|
for _, v := range params {
|
if _, ok := cfg.Param[v]; !ok {
|
fn("Face SDK Create Error Because of Param Not Found: ", v)
|
return nil
|
}
|
}
|
|
w, h, detThrd, detNum, detAngle, propThrd, extThrd, trckInterval, trckSmpl, maxChan, gpuM :=
|
1280, 720, sdkhelper.Atoi(cfg.Param[dt]), sdkhelper.Atoi(cfg.Param[dn]), sdkhelper.Atoi(cfg.Param[da]),
|
sdkhelper.Atoi(cfg.Param[pt]), sdkhelper.Atoi(cfg.Param[et]), sdkhelper.Atoi(cfg.Param[ti]), sdkhelper.Atoi(cfg.Param[ts]),
|
sdkhelper.Atoi(cfg.Param[mc]), sdkhelper.Atoi(cfg.Param[gm])
|
|
if detAngle > 0 {
|
}
|
|
rGPU := gpu
|
|
if rGPU == -1 {
|
rGPU = gogpu.ValidGPU(gpuM + 512)
|
}
|
if rGPU == -1 {
|
fn("Face SDK Create Error When Find GPU")
|
return nil
|
}
|
|
if !handle.Tracker(w, h, detNum, trckInterval, trckSmpl, detThrd, rGPU) {
|
fn("Face SDK Create Error When Init Tracker")
|
return nil
|
}
|
if !handle.Extractor(extThrd, rGPU) {
|
fn("Face SDK Create Error When Init Extractor")
|
return nil
|
}
|
|
if !handle.Propertizer(propThrd) {
|
fn("Face SDK Create Error When Init Propertizer")
|
return nil
|
}
|
|
return &face{
|
handle: handle,
|
list: sdkhelper.NewLockList(maxChan + maxChan/2),
|
|
maxLife: trckInterval * 10,
|
chLife: make(chan string, maxChan),
|
maxChannel: maxChan,
|
mapChan: make(map[string]*channel, maxChan),
|
|
fnLogger: fn,
|
|
typ: typ,
|
id: id,
|
shm: shm,
|
ipc2Rule: ipc2Rule,
|
ruleMsgMaxCacheSize: ruleMaxSize,
|
reserved: reserved,
|
}
|
}
|
|
func (f *face) release() {
|
if f.handle != nil {
|
f.handle.Free()
|
}
|
}
|
|
// Run run
|
func Run(ctx context.Context, i interface{}) {
|
s := i.(*face)
|
|
go s.maintainChannel(ctx, s.chLife)
|
|
chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
|
go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger)
|
}
|
|
func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) {
|
|
wg := &sync.WaitGroup{}
|
|
for _, msg := range msgs {
|
f.chLife <- msg.Cid
|
|
if c, ok := f.channelExist(msg.Cid); ok {
|
wg.Add(1)
|
go f.track(wg, &msg, c.index, out)
|
} else {
|
|
nc := f.channelNew(msg.Cid)
|
if nc == nil {
|
f.fnLogger("TOO MUCH CHANNEL")
|
sdkhelper.EjectResult(nil, msg, out)
|
continue
|
}
|
|
wg.Add(1)
|
go f.track(wg, &msg, nc.index, out)
|
}
|
}
|
wg.Wait()
|
f.fnLogger("######Wait for All track")
|
}
|
|
func (f *face) track(wg *sync.WaitGroup, msg *protomsg.SdkMessage, tchan int, out chan<- sdkstruct.MsgSDK) {
|
|
defer wg.Done()
|
|
rMsg := *msg
|
|
if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
|
sdkhelper.EjectResult(nil, rMsg, out)
|
f.fnLogger("Face!!!!!!SkdMessage Invalid: ", tchan)
|
|
return
|
}
|
|
i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
|
if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
|
sdkhelper.EjectResult(nil, rMsg, out)
|
f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed: ", tchan)
|
|
return
|
}
|
|
// conv to bgr24 and resize
|
imgW, imgH := int(i.Width), int(i.Height)
|
|
ret := f.handle.TrackerResize(imgW, imgH, tchan)
|
f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", tchan, " wXh: ", imgW, "x", imgH, " result:", ret)
|
|
count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, tchan)
|
|
sdkhelper.EjectResult(data, rMsg, out)
|
|
var id, name string
|
if rMsg.Tasklab != nil {
|
id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
|
}
|
f.fnLogger("Chan: ", tchan, "CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
|
|
}
|