package service
|
|
import (
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/deliver.git"
|
"github.com/gogo/protobuf/proto"
|
"github.com/pierrec/lz4"
|
"github.com/pkg/errors"
|
"github.com/satori/go.uuid"
|
"gocv.io/x/gocv"
|
"image"
|
"sync"
|
"time"
|
"webserver/extend/logger"
|
"webserver/extend/util"
|
)
|
|
type FaceSdkService struct {
|
File []byte
|
Id int64
|
Result []*protomsg.ResultFaceDetect
|
DeadTime time.Duration
|
}
|
|
const (
|
Ipc_Push_Ext = "_2.ipc"
|
Ipc_Pull_Ext = "_1.ipc"
|
Ipc_Url_Pre = "ipc:///tmp///"
|
faceExtractWebCID = "virtual-face-extract-web-camera-id"
|
Virtual_FaceTaskId = "92496BDF-2BFA-98F2-62E8-96DD9866ABD2"
|
Virtual_FaceSdkId = "virtual-faceextract-sdk-pull"
|
Url_Service_PUSH = Ipc_Url_Pre + Virtual_FaceSdkId + Ipc_Push_Ext
|
Url_Service_PULL = Ipc_Url_Pre + Virtual_FaceSdkId + Ipc_Pull_Ext
|
)
|
|
func GetFaceFeaFromSdk(fileBytes []byte,deadTime time.Duration) ([]*protomsg.ResultFaceDetect,error,*protomsg.Image){
|
t1 := time.Now()
|
s := NewFaceSdkService(fileBytes, deadTime)
|
i, err := s.ReadFromUploadImg()
|
logger.Debug("ReadFromUploadImg用时:", time.Since(t1))
|
t1 = time.Now()
|
if err !=nil{
|
logger.Debug("readFromUploadImg err:",err)
|
return nil,err,i
|
}
|
bc, err := ImgCompress(i)
|
logger.Debug("ImgCompress用时:", time.Since(t1))
|
t1 = time.Now()
|
if err !=nil {
|
logger.Debug("ImgCompress err:",err)
|
return nil,err,i
|
}
|
s.PushImgMsg(bc)
|
logger.Debug("PushImgMsg用时:", time.Since(t1))
|
t1 = time.Now()
|
s.GetFaceFea()
|
logger.Debug("GetFaceFea用时:", time.Since(t1))
|
if s.Result == nil{
|
return nil,errors.New("no fea"),i
|
} else {
|
return s.Result,nil,i
|
}
|
}
|
|
func NewFaceSdkService(fileBytes []byte, deadTime time.Duration) FaceSdkService{
|
return FaceSdkService{
|
File:fileBytes,
|
Id:time.Now().UnixNano(),
|
DeadTime:deadTime,
|
}
|
}
|
|
var imgPushChan chan []byte
|
var client_push deliver.Deliver
|
var client_pull deliver.Deliver
|
|
func TestPushImgMsg() {
|
InitService()
|
|
i := readTestImgFile()
|
|
logger.Debug("width:%d,height:%d,data.length:%d,timestamp:%s,id:%d\n",i.Width,i.Height,len(i.Data),i.Timestamp,i.Id)
|
|
bc, err := ImgCompress(&i)
|
if err !=nil {
|
logger.Debug("image is not compressible")
|
} else {
|
var s FaceSdkService
|
s.PushImgMsg(bc)
|
}
|
}
|
|
func (s *FaceSdkService) ReadFromUploadImg() (*protomsg.Image,error){
|
picMat, err := gocv.IMDecode(s.File, gocv.IMReadColor)
|
if err !=nil {
|
logger.Debug("gocv.IMDecode err:",err)
|
return nil,err
|
}
|
logger.Debug("picMat.Data.len:", len(picMat.ToBytes()))
|
newMat := gocv.NewMat()
|
if picMat.Rows() > 1000 || picMat.Cols() > 1000 {
|
gocv.Resize(picMat,&newMat, image.Pt(0,0), 0.5, 0.5, gocv.InterpolationDefault)
|
picMat = newMat
|
}
|
|
defer picMat.Close()
|
|
if picMat.Empty() {
|
logger.Debug("file not exist")
|
return nil,errors.New("picMat is empty")
|
}
|
height := int32(picMat.Rows())
|
width := int32(picMat.Cols())
|
data := picMat.ToBytes()
|
timeUnix := time.Now().Unix()
|
formatTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
|
return &protomsg.Image{
|
Width: width,
|
Height: height,
|
Timestamp: formatTimeStr,
|
Data: data,
|
Id: s.Id,
|
Cid: faceExtractWebCID,
|
},nil
|
}
|
|
func ImgCompress(i *protomsg.Image) ([]byte,error){
|
if b, err := proto.Marshal(i); err != nil {
|
logger.Debug("protoImage marshal err")
|
return nil,err
|
} else {
|
bc := make([]byte, len(b))
|
ht := make([]int, 64<<10)
|
n, err := lz4.CompressBlock(b, bc, ht)
|
if err != nil {
|
logger.Debug(err)
|
return nil,err
|
}
|
if n >= len(b) {
|
logger.Debug("image is not compressible")
|
return nil,errors.New("compressed len is 0")
|
}
|
bc = bc[:n]
|
return bc,nil
|
}
|
}
|
|
func (s *FaceSdkService) GetFaceFea(){
|
var wg sync.WaitGroup
|
wg.Add(1)
|
ticker := time.NewTicker(s.DeadTime)
|
go func(ticker *time.Ticker, s *FaceSdkService) {
|
defer ticker.Stop()
|
defer wg.Done()
|
|
for {
|
select {
|
case <-ticker.C:
|
return
|
default:
|
if faces,ok := resultMap.Get(s.Id);ok {
|
s.Result = faces
|
return
|
}
|
}
|
}
|
}(ticker, s)
|
wg.Wait()
|
}
|
|
func readTestImgFile() protomsg.Image{
|
var i protomsg.Image
|
timeUnix := time.Now().Unix()
|
formatTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
filePath := "/home/user/workspace/timg.jpg"
|
|
picMat := gocv.IMRead(filePath, gocv.IMReadColor)
|
|
defer picMat.Close()
|
|
if picMat.Empty() {
|
logger.Debug("file not exist")
|
return i
|
}
|
height := int32(picMat.Rows())
|
width := int32(picMat.Cols())
|
data := picMat.ToBytes()
|
//wrMat,_ := gocv.NewMatFromBytes(picMat.Rows(),picMat.Cols(),gocv.MatTypeCV8UC3,data)
|
//
|
//gocv.IMWrite("xxx.jpg", wrMat)
|
|
i = protomsg.Image{
|
Width: width,
|
Height: height,
|
Timestamp: formatTimeStr,
|
Data: data,
|
Id: timeUnix,
|
}
|
i.Cid = uuid.NewV4().String() //数据唯一id
|
logger.Debug("gocv read img completed")
|
return i
|
}
|
|
func (s *FaceSdkService) PushImgMsg(is []byte){
|
imgPushChan <- is
|
}
|
|
type FeaResult struct {
|
FaceM map[int64][]*protomsg.ResultFaceDetect
|
Lock sync.Mutex
|
}
|
|
func (f *FeaResult) Write(id int64,faceDetectResult []*protomsg.ResultFaceDetect){
|
f.Lock.Lock()
|
defer f.Lock.Unlock()
|
f.FaceM[id] = faceDetectResult
|
}
|
|
func (f *FeaResult) Get(id int64) ([]*protomsg.ResultFaceDetect,bool){
|
f.Lock.Lock()
|
defer f.Lock.Unlock()
|
coms,ok := f.FaceM[id]
|
return coms,ok
|
}
|
|
func (f *FeaResult) Delete(id int64){
|
f.Lock.Lock()
|
defer f.Lock.Unlock()
|
delete(f.FaceM,id)
|
}
|
|
var resultMap = FeaResult{}
|
|
func InitService() {
|
logger.Debug("service init!")
|
imgPushChan = make(chan []byte)
|
resultMap.FaceM = make(map[int64][]*protomsg.ResultFaceDetect,0)
|
client_push = deliver.NewClient(deliver.PushPull, Url_Service_PUSH)
|
client_pull = deliver.NewClient(deliver.PushPull, Url_Service_PULL)
|
defer func() {
|
client_push.Close()
|
client_pull.Close()
|
}()
|
go thSend()
|
|
go thRecv()
|
}
|
|
func thSend() {
|
for {
|
select {
|
case d := <-imgPushChan:
|
logger.Debug("imgPushChan in")
|
err := client_push.Send(d)
|
if err != nil {
|
logger.Debug("img Send err:", err)
|
}
|
default:
|
//logger.Debug("no img in")
|
}
|
}
|
}
|
|
func thRecv() {
|
for {
|
resultBytes, err := client_pull.Recv()
|
if err != nil {
|
//logger.Debug("pull err:", err)
|
continue
|
}
|
rMsg := protomsg.SdkMessage{}
|
if err := proto.Unmarshal(resultBytes, &rMsg); err == nil {
|
i := protomsg.Image{}
|
bdata, err := util.UnCompress(rMsg.Data)
|
if err !=nil {
|
logger.Debug("uncompress err:",err)
|
continue
|
}
|
err = proto.Unmarshal(bdata, &i)
|
if err !=nil {
|
continue
|
}
|
perId := i.Id //数据id
|
if rMsg.Tasklab != nil && rMsg.Tasklab.Taskid == Virtual_FaceTaskId {
|
sdkInfos := rMsg.Tasklab.Sdkinfos
|
|
logger.Debug("Len(sdkInfos)=",len(sdkInfos))
|
for _,swt :=range sdkInfos{
|
//人脸检测的结果
|
if swt.Sdktype =="FaceDetect"{
|
logger.Debug("人脸检测结果长度:",len(swt.Sdkdata))
|
if len(swt.Sdkdata)>1{
|
var pfp protomsg.ParamFacePos
|
err := proto.Unmarshal(swt.Sdkdata, &pfp)
|
if err !=nil {
|
logger.Debug("faceDetect result unmarshal err:",err)
|
} else {
|
logger.Debug("检测人脸数:",len(pfp.Faces))
|
if len(pfp.Faces)>0{
|
resultMap.Write(perId,pfp.Faces)
|
}
|
//for _,face :=range pfp.Faces{
|
//logger.Debug("FacePos:",face.Pos)
|
//logger.Debug("ThftResult:",face.Result)
|
// base64Fea := base64.StdEncoding.EncodeToString(face.Feats)
|
// logger.Debug("perId:",perId)
|
// logger.Debug("faceFeature:",base64Fea)
|
//}
|
}
|
}
|
break
|
}
|
}
|
}
|
} else {
|
logger.Debug("recv msg Err:", err)
|
}
|
|
}
|
}
|