派生自 libgowrapper/face

zhangmeng
2020-01-15 0ffd95f2278e860736e46f8b73f357470f5a3d91
run.go
@@ -2,84 +2,28 @@
import (
   "context"
   "os"
   "sync"
   "time"
   "unsafe"
   "basic.com/libgowrapper/sdkhelper.git"
   "face/common"
   "basic.com/libgowrapper/sdkstruct.git"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/gogpu.git"
   "github.com/gogo/protobuf/proto"
)
type channel struct {
   index int
   msg   chan protomsg.SdkMessage
}
//////////////////////////////////////////////////////////////////
const (
   cacheFrameNum   = 3
   trackChnTimeout = time.Duration(10)
)
func (f *face) channelAvailable() int {
   for i := 0; i < f.maxChannel; i++ {
      found := false
      for _, v := range f.mapChan {
         if v.index == i {
            found = true
            break
         }
      }
      if !found {
         return i
      }
   }
   return -1
}
func (f *face) channelRelease(id string) {
   f.mapChanLock.Lock()
   defer f.mapChanLock.Unlock()
   delete(f.mapChan, id)
}
func (f *face) channelExist(id string) (*channel, bool) {
   f.mapChanLock.Lock()
   defer f.mapChanLock.Unlock()
   c, ok := f.mapChan[id]
   return c, ok
}
func (f *face) channelNew(id string) *channel {
   f.mapChanLock.Lock()
   defer f.mapChanLock.Unlock()
   index := f.channelAvailable()
   if index < 0 {
      return nil
   }
   c := &channel{
      index: index,
      msg:   make(chan protomsg.SdkMessage, cacheFrameNum),
   }
   f.mapChan[id] = c
   return c
}
const maxTryBeforeReboot = 10
type face struct {
   handle *SDKFace
   list   *sdkhelper.LockList
   maxChannel  int
   mapChan     map[string]*channel
   mapChanLock sync.Mutex
   maxChannel      int
   ftrackChans     map[string]chan common.MsgRS
   ftrackChannels  map[string]int
   ftrackChanStats []bool
   chnLock         sync.Mutex
   fnLogger func(...interface{})
@@ -89,6 +33,40 @@
   ipc2Rule            string
   ruleMsgMaxCacheSize int
   reserved            map[string]interface{}
   running     bool
   rebootUntil int
   mtxRunning  sync.Mutex
}
func (f *face) maybeReboot(ctx context.Context) {
   for {
      select {
      case <-ctx.Done():
         return
      default:
         f.mtxRunning.Lock()
         running := f.running
         f.mtxRunning.Unlock()
         if running {
            f.rebootUntil = 0
            f.mtxRunning.Lock()
            f.running = false
            f.mtxRunning.Unlock()
         } else {
            f.rebootUntil++
            f.fnLogger("Face No Running: ", f.rebootUntil)
            if f.rebootUntil > maxTryBeforeReboot {
               f.fnLogger("Face Too Long Running, Reboot")
               os.Exit(0)
            }
         }
         time.Sleep(time.Second)
      }
   }
}
// Create create sdk
@@ -99,7 +77,7 @@
      fn("Face SDK Create Error When New SDK")
      return nil
   }
   cfg, err := sdkhelper.ReadConfig(config)
   cfg, err := common.ReadConfig(config)
   if err != nil {
      fn("Face SDK Create Error When Read Config: ", err)
      return nil
@@ -127,9 +105,9 @@
   }
   w, h, detThrd, detNum, detAngle, propThrd, extThrd, trckInterval, trckSmpl, maxChan, gpuM :=
      1280, 720, sdkhelper.Atoi(cfg.Param[dt]), sdkhelper.Atoi(cfg.Param[dn]), sdkhelper.Atoi(cfg.Param[da]),
      sdkhelper.Atoi(cfg.Param[pt]), sdkhelper.Atoi(cfg.Param[et]), sdkhelper.Atoi(cfg.Param[ti]), sdkhelper.Atoi(cfg.Param[ts]),
      sdkhelper.Atoi(cfg.Param[mc]), sdkhelper.Atoi(cfg.Param[gm])
      1280, 720, common.Atoi(cfg.Param[dt]), common.Atoi(cfg.Param[dn]), common.Atoi(cfg.Param[da]),
      common.Atoi(cfg.Param[pt]), common.Atoi(cfg.Param[et]), common.Atoi(cfg.Param[ti]), common.Atoi(cfg.Param[ts]),
      common.Atoi(cfg.Param[mc]), common.Atoi(cfg.Param[gm])
   if detAngle > 0 {
   }
@@ -160,10 +138,11 @@
   return &face{
      handle: handle,
      list:   sdkhelper.NewLockList(maxChan + maxChan/2),
      maxChannel: maxChan,
      mapChan:    make(map[string]*channel, maxChan),
      maxChannel:      maxChan,
      ftrackChans:     make(map[string]chan common.MsgRS, maxChan),
      ftrackChannels:  make(map[string]int, maxChan),
      ftrackChanStats: make([]bool, maxChan, maxChan),
      fnLogger: fn,
@@ -173,6 +152,9 @@
      ipc2Rule:            ipc2Rule,
      ruleMsgMaxCacheSize: ruleMaxSize,
      reserved:            reserved,
      running:     true,
      rebootUntil: maxTryBeforeReboot,
   }
}
@@ -186,67 +168,129 @@
func Run(ctx context.Context, i interface{}) {
   s := i.(*face)
   chRcv, chSnd := sdkhelper.FlowCreate(ctx, s.id, s.shm, s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
   go sdkhelper.FlowBatch(ctx, chRcv, chSnd, s.typ, s.list.Push, s.list.Drain, s.run, s.release, s.fnLogger)
   const (
      postPull = `_1`
      postPush = `_2`
   )
   ipcRcv := common.GetIpcAddress(s.shm, s.id+postPull)
   ipcSnd := common.GetIpcAddress(s.shm, s.id+postPush)
   chRcv := make(chan common.MsgRS, s.maxChannel)
   chSnd := make(chan common.MsgRS, s.maxChannel)
   rcver := common.NewReciever(ipcRcv, chRcv, s.shm, s.fnLogger)
   snder := common.NewSender(ipcSnd, chSnd, s.shm, s.fnLogger)
   torule := common.NewToRule(s.ipc2Rule, s.ruleMsgMaxCacheSize, s.fnLogger)
   snder.ApplyCallbackFunc(torule.Push)
   go rcver.Run(ctx)
   go snder.Run(ctx)
   go torule.Run(ctx)
   go s.run(ctx, chRcv, chSnd)
   go s.maybeReboot(ctx)
}
func (f *face) run(msgs []protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK, typ string) {
//////////////////////////////////////////////////////////////////
const (
   trackChnTimeout = time.Duration(10)
)
   f.fnLogger("######run batch images")
func (f *face) cleanChnStat() {
   f.chnLock.Lock()
   for i := 0; i < f.maxChannel; i++ {
      f.ftrackChanStats[i] = false
   }
   f.chnLock.Unlock()
}
   for _, msg := range msgs {
      if c, ok := f.channelExist(msg.Cid); ok {
         c.msg <- msg
      } else {
func (f *face) getAvailableChn() int {
   f.chnLock.Lock()
   defer f.chnLock.Unlock()
         nc := f.channelNew(msg.Cid)
         if nc == nil {
            f.fnLogger("TOO MUCH CHANNEL")
            sdkhelper.EjectResult(nil, msg, out)
            continue
         }
         i := sdkhelper.UnpackImage(msg, f.typ, f.fnLogger)
         if i == nil {
            sdkhelper.EjectResult(nil, msg, out)
            continue
         }
         // conv to bgr24 and resize
         imgW, imgH := int(i.Width), int(i.Height)
         ret := f.handle.TrackerResize(imgW, imgH, nc.index)
         f.fnLogger("ResizeFaceTracker: cid: ", msg.Cid, " chan: ", nc.index, " wXh: ", imgW, "x", imgH, " result:", ret)
         go f.detectTrackOneChn(c, out)
         c.msg <- msg
   for i := 0; i < f.maxChannel; i++ {
      if f.ftrackChanStats[i] == false {
         f.ftrackChanStats[i] = true
         return i
      }
   }
   f.fnLogger("######run over batch images")
   return -1
}
func (f *face) detectTrackOneChn(c *channel, out chan<- sdkstruct.MsgSDK) {
func (f *face) releaseChn(chn int) {
   f.chnLock.Lock()
   f.ftrackChanStats[chn] = false
   f.chnLock.Unlock()
}
func (f *face) run(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS) {
   for {
      select {
      case <-ctx.Done():
         f.handle.Free()
         return
      case rMsg := <-in:
         if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
            f.fnLogger("FACE TRACK VALIDREMOTEMESSAGE INVALID")
            common.EjectResult(nil, rMsg, out)
            continue
         }
         if _, ok := f.ftrackChans[rMsg.Msg.Cid]; ok {
            f.fnLogger("Face Cache Size: ", len(f.ftrackChans))
            f.ftrackChans[rMsg.Msg.Cid] <- rMsg
         } else {
            f.ftrackChans[rMsg.Msg.Cid] = make(chan common.MsgRS, f.maxChannel)
            chn := f.getAvailableChn()
            if chn < 0 {
               f.fnLogger("TOO MUCH CHANNEL")
               common.EjectResult(nil, rMsg, out)
               continue
            }
            f.ftrackChannels[rMsg.Msg.Cid] = chn
            i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
            if i == nil {
               common.EjectResult(nil, rMsg, out)
               continue
            }
            // conv to bgr24 and resize
            imgW, imgH := int(i.Width), int(i.Height)
            ret := f.handle.TrackerResize(imgW, imgH, chn)
            f.fnLogger("ResizeFaceTracker: cid: ", rMsg.Msg.Cid, " chan: ", chn, " wXh: ", imgW, "x", imgH, " result:", ret)
            go f.detectTrackOneChn(ctx, f.ftrackChans[rMsg.Msg.Cid], out, chn)
            f.ftrackChans[rMsg.Msg.Cid] <- rMsg
         }
      default:
         time.Sleep(time.Millisecond * 100)
      }
   }
}
func (f *face) detectTrackOneChn(ctx context.Context, in <-chan common.MsgRS, out chan<- common.MsgRS, dtchn int) {
   tm := time.Now()
   sc := 0
   dtchn := c.index
   f.fnLogger("DETECTTRACKONECHN DTCHN: ", dtchn)
   var curCid string
   for {
      select {
      case rMsg := <-c.msg:
      case <-ctx.Done():
         return
         if !sdkhelper.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
            sdkhelper.EjectResult(nil, rMsg, out)
            f.fnLogger("Face!!!!!!SkdMessage Invalid")
      case rMsg := <-in:
         if !common.ValidRemoteMessage(rMsg, f.typ, f.fnLogger) {
            common.EjectResult(nil, rMsg, out)
            continue
         }
         i := sdkhelper.UnpackImage(rMsg, f.typ, f.fnLogger)
         i := common.UnpackImage(rMsg, f.typ, f.fnLogger)
         if i == nil || i.Data == nil || i.Width <= 0 || i.Height <= 0 {
            sdkhelper.EjectResult(nil, rMsg, out)
            f.fnLogger("Face!!!!!!Unpack Image From SkdMessage Failed")
            common.EjectResult(nil, rMsg, out)
            continue
         }
@@ -255,14 +299,69 @@
         // conv to bgr24 and resize
         imgW, imgH := int(i.Width), int(i.Height)
         count, data, _ := f.handle.Run(i.Data, imgW, imgH, 3, dtchn)
         // f.fnLogger("Face~~~Track", dtchn)
         detect := f.handle.Track(i.Data, imgW, imgH, 3, dtchn)
         // f.fnLogger("Face~~~Track Over", dtchn)
         sdkhelper.EjectResult(data, rMsg, out)
         var id, name string
         if rMsg.Tasklab != nil {
            id, name = rMsg.Tasklab.Taskid, rMsg.Tasklab.Taskname
         var faces []*protomsg.ResultFaceDetect
         //将sdk返回值转换成protomsg类型
         for _, d := range detect {
            //运行sd
            // f.fnLogger("Face~~~FaceInfo2FacePos", dtchn)
            dec := FaceInfo2FacePos(d)
            // f.fnLogger("Face~~~FaceInfo2FacePos over", dtchn)
            // f.fnLogger("Face~~~Propertize", dtchn)
            prop := f.handle.Propertize(dec, i.Data, imgW, imgH, 3, dtchn)
            // f.fnLogger("Face~~~Propertize Over", dtchn)
            f.fnLogger("Face~~~Extract", dtchn)
            feat := f.handle.Extract(dec, i.Data, imgW, imgH, 3, dtchn)
            f.fnLogger("Face~~~Extract Over", dtchn)
            // f.fnLogger("Face~~~protomsg.ThftResult", dtchn)
            resP := (*protomsg.ThftResult)(unsafe.Pointer(&prop))
            // f.fnLogger("Face~~~protomsg.ThftResult Over", dtchn)
            // f.fnLogger("Face~~~tconvert2ProtoFacePos", dtchn)
            result := tconvert2ProtoFacePos(d)
            // f.fnLogger("Face~~~tconvert2ProtoFacePos Over", dtchn)
            //组成结果并序列化
            res := &protomsg.ResultFaceDetect{Pos: result, Result: resP, Feats: feat}
            faces = append(faces, res)
         }
         f.fnLogger("CAMERAID: ", rMsg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", count)
         var err error
         var data []byte
         if len(faces) > 0 {
            // logo.Infoln("CID: ", rMsg.Msg.Cid, " TASK: ", rMsg.Msg.Tasklab.Taskid, " FACE TRACK OBJS: ", len(faces))
            facePos := protomsg.ParamFacePos{Faces: faces}
            data, err = proto.Marshal(&facePos)
            if err != nil {
               f.fnLogger("fdetect marshal proto face pos error", err)
               data = nil
            }
         }
         // f.fnLogger("Face~~~EjectResult", dtchn)
         common.EjectResult(data, rMsg, out)
         // f.fnLogger("Face~~~EjectResult Over", dtchn)
         f.mtxRunning.Lock()
         f.running = true
         f.mtxRunning.Unlock()
         var id, name string
         if rMsg.Msg.Tasklab != nil {
            id, name = rMsg.Msg.Tasklab.Taskid, rMsg.Msg.Tasklab.Taskname
         }
         f.fnLogger("Chan:", dtchn, "CAMERAID: ", rMsg.Msg.Cid, " TASKID: ", id, " TASKNAME: ", name, " DETECT ", f.typ, " COUNT: ", len(faces))
         sc++
         if sc == 25 {
@@ -279,9 +378,11 @@
      case <-time.After(trackChnTimeout * time.Second):
         f.fnLogger("Timeout to get image, curCid:", curCid)
         if curCid != "" {
            f.channelRelease(curCid)
            delete(f.ftrackChans, curCid)
            f.releaseChn(dtchn)
         }
         return
      }
   }
}