| | |
| | | ) |
| | | |
| | | const ( |
| | | FacesUrI = "/VIID/Faces" |
| | | FacesUrI = "/VIID/Faces" |
| | | PersonsUrI = "/VIID/Persons" |
| | | ) |
| | | |
| | | func FaceCapture(msg []byte) int { |
| | |
| | | logger.Debug("Post faces success.") |
| | | return stat.StatusCode |
| | | } |
| | | |
| | | func PersonCapture(msg []byte) int { |
| | | if clientStatus != vo.StatusSuccess { |
| | | return clientStatus |
| | | } |
| | | |
| | | url := fmt.Sprintf("%s://%s:%s%s", config.ClientConf.Proto, config.ClientConf.ServerAddr, config.ClientConf.ServerPort, PersonsUrI) |
| | | rsp, err := util.HttpPost(url, headers, msg) |
| | | if err != nil { |
| | | logger.Warn("Post person failed, %s", err.Error()) |
| | | return vo.StatusOtherError |
| | | } |
| | | |
| | | var stat vo.ResponseStatus |
| | | err = json.Unmarshal(rsp, &stat) |
| | | if err != nil { |
| | | logger.Warn("Post person response unmarshal failed, %s", err.Error()) |
| | | return vo.StatusOtherError |
| | | } |
| | | |
| | | logger.Debug("Post person success.") |
| | | |
| | | return stat.StatusCode |
| | | } |
| | |
| | | return vo.StatusOtherError |
| | | } |
| | | |
| | | logger.Debug("Post notification success.") |
| | | logger.Debug("Post subscribe success.") |
| | | |
| | | return vo.StatusSuccess |
| | | } |
| | |
| | | return vo.StatusOtherError |
| | | } |
| | | |
| | | logger.Debug("Post notification success.") |
| | | logger.Debug("put subscribe success.") |
| | | |
| | | return vo.StatusSuccess |
| | | } |
| | |
| | | func GetSubscribes(url string) ([]byte, error) { |
| | | rsp, err := util.HttpGet(url, headers) |
| | | if err != nil { |
| | | logger.Warn("Put subscribe failed, %s", err.Error()) |
| | | |
| | | logger.Warn("Get subscribe failed, %s", err.Error()) |
| | | } |
| | | |
| | | return rsp, err |
| | |
| | | |
| | | import ( |
| | | "encoding/base64" |
| | | "gat1400Exchange/config" |
| | | "gat1400Exchange/pkg" |
| | | "gat1400Exchange/service" |
| | | "fmt" |
| | | "math/rand" |
| | | "net/http" |
| | | "strconv" |
| | | "time" |
| | | |
| | | "gat1400Exchange/config" |
| | | "gat1400Exchange/pkg" |
| | | "gat1400Exchange/pkg/logger" |
| | | "gat1400Exchange/repository" |
| | | "gat1400Exchange/service" |
| | | "gat1400Exchange/vo" |
| | | |
| | | "github.com/gin-gonic/gin" |
| | |
| | | |
| | | // 如果开启了下级, 身份应该是消息代理, 不再转发到服务器 |
| | | if config.ClientConf.Enable && config.ServeConf.Role == "agent" { |
| | | go a.Repository.VIIDMsgForward(&req) |
| | | go a.Repository.VIIDFaceMsgForward(&req) |
| | | } else if config.ServeConf.Role == "cascade" { |
| | | go service.AddFaceNotification(&face) |
| | | } |
| | |
| | | videoLabel := req.VideoLabelListObject.VideoLabelObject[0] |
| | | logger.Debug("Receive new message, Id:%s Ip:%s ", videoLabel.VideoLabelID, c.RemoteIP()) |
| | | |
| | | // 转人脸消息 |
| | | var face vo.FaceObject |
| | | face.FaceID = videoLabel.VideoLabelID |
| | | face.InfoKind = 1 |
| | | face.SourceID = videoLabel.VideoImageID |
| | | face.DeviceID = videoLabel.IVADeviceID |
| | | face.LocationMarkTime = videoLabel.CreateTimeAbs |
| | | face.FaceAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime |
| | | face.FaceDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime |
| | | // 转人员消息 |
| | | var person vo.PersonObject |
| | | person.PersonID = videoLabel.VideoLabelID[0:41] + "01" + videoLabel.VideoLabelID[43:48] |
| | | person.InfoKind = "1" |
| | | person.DeviceID = videoLabel.IVADeviceID |
| | | person.SourceID = videoLabel.VideoImageID |
| | | person.LocationMarkTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime |
| | | person.PersonAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime |
| | | person.PersonDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime |
| | | person.IsDriver = 2 |
| | | person.IsCriminalInvolved = 2 |
| | | |
| | | var hasTargetImage bool |
| | | var bgImageWith, bgImageHeight int |
| | |
| | | bgImage = &videoLabel.SubImageList.SubImageInfoObject[idx] |
| | | } |
| | | |
| | | face.SubImageList.SubImageInfoObject = append( |
| | | face.SubImageList.SubImageInfoObject, |
| | | person.SubImageList.SubImageInfoObject = append( |
| | | person.SubImageList.SubImageInfoObject, |
| | | videoLabel.SubImageList.SubImageInfoObject[idx], |
| | | ) |
| | | } |
| | |
| | | return |
| | | } |
| | | |
| | | imageId := bgImage.ImageID[37:41] |
| | | imageNumber, err := strconv.Atoi(imageId) |
| | | if err != nil { |
| | | imageNumber = rand.Intn(90000) + 10000 |
| | | } else { |
| | | imageNumber++ |
| | | } |
| | | var subImageInfo = vo.SubImageInfoObject{ |
| | | ImageID: bgImage.ImageID + "1", |
| | | ImageID: fmt.Sprintf("%s%d", bgImage.ImageID[0:37], imageNumber), |
| | | EventSort: 10, |
| | | DeviceID: bgImage.DeviceID, |
| | | StoragePath: "", |
| | |
| | | Height: subRect.Top - subRect.Bottom, |
| | | Data: base64.StdEncoding.EncodeToString(subImage), |
| | | } |
| | | face.SubImageList.SubImageInfoObject = append( |
| | | face.SubImageList.SubImageInfoObject, |
| | | person.SubImageList.SubImageInfoObject = append( |
| | | person.SubImageList.SubImageInfoObject, |
| | | subImageInfo, |
| | | ) |
| | | } |
| | | |
| | | // 如果开启了下级, 身份应该是消息代理, 不再转发到服务器 |
| | | if config.ClientConf.Enable && config.ServeConf.Role == "agent" { |
| | | var faceObjList vo.RequestFaceList |
| | | faceObjList.FaceListObject.FaceObject = append(faceObjList.FaceListObject.FaceObject, face) |
| | | go a.Repository.VIIDMsgForward(&faceObjList) |
| | | var personList vo.RequestPersonList |
| | | personList.PersonListObject.PersonObject = append(personList.PersonListObject.PersonObject, person) |
| | | go a.Repository.VIIDPersonMsgForward(&personList) |
| | | } else if config.ServeConf.Role == "cascade" { |
| | | go service.AddFaceNotification(&face) |
| | | go service.AddPersonNotification(&person) |
| | | } |
| | | |
| | | if config.ForwardConf.SyncServer != "" { |
| | | go a.Repository.FaceForward([]vo.FaceObject{face}) |
| | | go a.Repository.PersonForward([]vo.PersonObject{person}) |
| | | } |
| | | |
| | | rspMsg := vo.ResponseStatus{ |
| | |
| | | // 启动网络视频字符叠加器服务 |
| | | go service.StartNVCSServer() |
| | | |
| | | go service.InitSubscribeTask() |
| | | go service.InitSubscribeNotificationTasks() |
| | | |
| | | // 启动定时任务 |
| | | cron.Init() |
| | |
| | | logger.Error("Server forced to shutdown:", err) |
| | | } |
| | | |
| | | service.StopSubscribeTask() |
| | | service.StopNotificationTasks() |
| | | |
| | | logger.Info("Server exiting!") |
| | | } |
| | |
| | | return |
| | | } |
| | | |
| | | func (c CaptureRepository) PersonForward(personList []vo.PersonObject) { |
| | | var err error |
| | | |
| | | if personList == nil || len(personList) == 0 { |
| | | logger.Warn("FaceList is nil") |
| | | return |
| | | } |
| | | |
| | | for _, v := range personList { |
| | | if v.SubImageList.SubImageInfoObject == nil { |
| | | logger.Warn("SubImageInfoObject is nil") |
| | | continue |
| | | } |
| | | var deviceId = v.DeviceID |
| | | var targetId = v.PersonID |
| | | var bgImageStr string |
| | | var bgImageBytes, faceImageBytes []byte = nil, nil |
| | | |
| | | // 获取大图, 目前海康的小图分辨率太低 |
| | | for _, image := range v.SubImageList.SubImageInfoObject { |
| | | if image.Type != "14" { |
| | | continue |
| | | } |
| | | |
| | | if len(image.Data) > 0 { |
| | | if len(image.Data) > len(bgImageStr) { |
| | | bgImageStr = image.Data |
| | | } |
| | | } else if image.StoragePath != "" { |
| | | imgData, err := util.ImageDownload(image.StoragePath, nil) |
| | | if err != nil { |
| | | logger.Warn("Image download failure, %s", err.Error()) |
| | | } else { |
| | | bgImageBytes = imgData |
| | | bgImageStr = "///" |
| | | } |
| | | } |
| | | } |
| | | |
| | | if bgImageBytes == nil { |
| | | bgImageBytes, err = base64.StdEncoding.DecodeString(bgImageStr) |
| | | if err != nil { |
| | | logger.Warn("Decode Image Base64 String failure, %s", err.Error()) |
| | | continue |
| | | } |
| | | } |
| | | |
| | | // 转发图像 |
| | | logger.Debug("Prepare forward person image, deviceId:%s, image len:%d, server:%s", deviceId, len(bgImageBytes), config.ForwardConf.SyncServer) |
| | | if deviceId != "" && bgImageStr != "" && config.ForwardConf.SyncServer != "" { |
| | | pd := c.PackPushDataV2(deviceId, targetId, v.PersonAppearTime, bgImageBytes, faceImageBytes) |
| | | if pd == nil { |
| | | return |
| | | } |
| | | |
| | | // 处理梯控填写的楼层信息 暂时使用oherFeature字段 |
| | | if v.BehaviorDescription != "" { |
| | | pd.CameraFloor = v.BehaviorDescription |
| | | } |
| | | |
| | | // 尝试从faceId提取楼层 |
| | | if pd.CameraFloor == "" && config.ClientConf.AddFloorToFaceId { |
| | | pd.CameraFloor, _ = pkg.ParseFloorFromId(v.PersonID) |
| | | } |
| | | //logger.Debug("device %s, CameraFloor:%s", deviceId, pd.CameraFloor) |
| | | |
| | | payload, err := json.Marshal(pd) |
| | | if err != nil { |
| | | logger.Warn("Marshal error, %s", err.Error()) |
| | | return |
| | | } |
| | | |
| | | if !util.SendData(payload, config.ForwardConf.SyncServer) { |
| | | cacheItem, _ := json.Marshal(pd) |
| | | c.CacheData(cacheItem, "basic") |
| | | logger.Warn("The data forwarding failed, adding to local cache.") |
| | | } else { |
| | | logger.Debug("The data forwarding successful. deviceId:%s", deviceId) |
| | | } |
| | | } |
| | | } |
| | | |
| | | return |
| | | } |
| | | |
| | | func (c CaptureRepository) PackPushDataV2(deviceId, faceId, appearTime string, bgImgBytes, faceImgBytes []byte) *vo.PushDataInfoV2 { |
| | | var pd = new(vo.PushDataInfoV2) |
| | | var floor string |
| | |
| | | cacheItem.Save() |
| | | } |
| | | |
| | | func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) { |
| | | func (c CaptureRepository) VIIDFaceMsgForward(msg *vo.RequestFaceList) { |
| | | faceInfo := msg.FaceListObject.FaceObject[0] |
| | | // 匹配楼层 |
| | | faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local) |
| | |
| | | b, _ := json.Marshal(msg) |
| | | if client.FaceCapture(b) != vo.StatusSuccess { |
| | | cacheItem, _ := json.Marshal(msg) |
| | | c.CacheData(cacheItem, "1400") |
| | | c.CacheData(cacheItem, "1400-face") |
| | | logger.Warn("The data forwarding failed, adding to local cache.") |
| | | } |
| | | |
| | | return |
| | | } |
| | | |
| | | func (c CaptureRepository) VIIDPersonMsgForward(msg *vo.RequestPersonList) { |
| | | person := msg.PersonListObject.PersonObject[0] |
| | | // 匹配楼层 |
| | | appearTime, err := time.ParseInLocation("20060102150405", person.PersonAppearTime, time.Local) |
| | | if err != nil { |
| | | logger.Warn("Parse face appear time error, %s", err.Error()) |
| | | appearTime = time.Now() |
| | | } |
| | | |
| | | var devPos models.Positions |
| | | _ = devPos.FindPositionByTime(appearTime.Unix() + 5) // 加5秒电梯关门的时间 |
| | | if devPos.Pos == "" { |
| | | devPos.Pos = "1F" |
| | | } |
| | | |
| | | for idx, v := range msg.PersonListObject.PersonObject { |
| | | msg.PersonListObject.PersonObject[idx].BehaviorDescription = devPos.Pos |
| | | if config.ClientConf.AddFloorToFaceId { |
| | | msg.PersonListObject.PersonObject[idx].PersonID = pkg.GenerateFaceIdContainFloor(v.PersonID, devPos.Pos) |
| | | } |
| | | } |
| | | |
| | | b, _ := json.Marshal(msg) |
| | | if client.PersonCapture(b) != vo.StatusSuccess { |
| | | cacheItem, _ := json.Marshal(msg) |
| | | c.CacheData(cacheItem, "1400-person") |
| | | logger.Warn("The data forwarding failed, adding to local cache.") |
| | | } |
| | | |
| | |
| | | return err |
| | | } |
| | | |
| | | service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) |
| | | service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) |
| | | |
| | | return err |
| | | } |
| | |
| | | sub.Status = subscribe.SubscribeStatus |
| | | sub.Ext = *subscribe |
| | | |
| | | service.UpdateTaskProcs(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub) |
| | | service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub) |
| | | |
| | | return sub.Save() |
| | | } |
| | |
| | | return err |
| | | } |
| | | |
| | | service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil) |
| | | service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil) |
| | | |
| | | return err |
| | | } |
New file |
| | |
| | | package service |
| | | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "gat1400Exchange/pkg/logger" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "gat1400Exchange/client" |
| | | "gat1400Exchange/models" |
| | | "gat1400Exchange/pkg/snowflake" |
| | | "gat1400Exchange/vo" |
| | | ) |
| | | |
| | | type NotificationTask struct { |
| | | ctx context.Context |
| | | conf *models.Subscribe |
| | | faceList []*vo.FaceObject |
| | | personList []*vo.PersonObject |
| | | mutex sync.Mutex |
| | | lastApeExecTime int64 |
| | | } |
| | | |
| | | func (task *NotificationTask) start() { |
| | | TaskWaitGroup.Add(1) |
| | | Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) |
| | | |
| | | for { |
| | | select { |
| | | case <-task.ctx.Done(): |
| | | TaskProcMap.Delete(task.conf.Id) |
| | | TaskWaitGroup.Done() |
| | | logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) |
| | | return |
| | | case <-Timer.C: |
| | | task.notify() |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (task *NotificationTask) addFace(face *vo.FaceObject) { |
| | | if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { |
| | | return |
| | | } |
| | | |
| | | task.mutex.Lock() |
| | | defer task.mutex.Unlock() |
| | | |
| | | task.faceList = append(task.faceList, face) |
| | | } |
| | | |
| | | func (task *NotificationTask) addPerson(person *vo.PersonObject) { |
| | | if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) { |
| | | return |
| | | } |
| | | |
| | | task.mutex.Lock() |
| | | defer task.mutex.Unlock() |
| | | |
| | | task.personList = append(task.personList, person) |
| | | } |
| | | |
| | | func (task *NotificationTask) notify() { |
| | | subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") |
| | | for _, subType := range subDetails { |
| | | var msg *vo.RequestSubscribeNotification |
| | | switch subType { |
| | | case vo.SubscribeApe: |
| | | msg = task.packDeviceNotificationMsg() |
| | | case vo.SubscribeFace: |
| | | msg = task.packFaceNotificationMsg() |
| | | case vo.SubscribePerson: |
| | | } |
| | | |
| | | if msg != nil { |
| | | b, _ := json.Marshal(msg) |
| | | client.Notify(task.conf.Ext.ReceiveAddr, b) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification { |
| | | triggerTime := time.Now().Format("20060102150405") |
| | | if len(task.faceList) == 0 { |
| | | return nil |
| | | } |
| | | |
| | | var notification = vo.FaceNotification{ |
| | | NotificationID: triggerTime + snowflake.GenerateIdStr(), |
| | | SubscribeID: task.conf.Id, |
| | | Title: task.conf.Ext.Title, |
| | | TriggerTime: triggerTime, |
| | | ExecuteOperation: 1, |
| | | } |
| | | |
| | | var ids []string |
| | | for idx, _ := range task.faceList { |
| | | // 按海康示例填充修改字段 |
| | | task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime |
| | | task.faceList[idx].EntryTime = triggerTime |
| | | for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { |
| | | task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 |
| | | } |
| | | |
| | | ids = append(ids, task.faceList[idx].FaceID) |
| | | notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) |
| | | } |
| | | |
| | | notification.InfoIDs = strings.Join(ids, ";") |
| | | |
| | | task.mutex.Lock() |
| | | task.faceList = []*vo.FaceObject{} |
| | | task.mutex.Unlock() |
| | | |
| | | var req vo.RequestSubscribeNotification |
| | | req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification |
| | | |
| | | return &req |
| | | } |
| | | |
| | | func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification { |
| | | // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新, |
| | | if time.Now().Unix()-task.lastApeExecTime < 60*10 { |
| | | return nil |
| | | } |
| | | |
| | | triggerTime := time.Now().Format("20060102150405") |
| | | |
| | | task.lastApeExecTime = time.Now().Unix() |
| | | |
| | | var notification = vo.DeviceNotification{ |
| | | NotificationID: triggerTime + snowflake.GenerateIdStr(), |
| | | SubscribeID: task.conf.Id, |
| | | Title: task.conf.Ext.Title, |
| | | TriggerTime: triggerTime, |
| | | ExecuteOperation: 1, |
| | | } |
| | | |
| | | var ids []string |
| | | var apeMod models.Ape |
| | | apeList, err := apeMod.FindAll() |
| | | if err != nil { |
| | | logger.Warn(err.Error()) |
| | | return nil |
| | | } else { |
| | | for idx, _ := range apeList { |
| | | ids = append(ids, apeList[idx].Id) |
| | | notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) |
| | | } |
| | | } |
| | | |
| | | notification.InfoIDs = strings.Join(ids, ";") |
| | | |
| | | var req vo.RequestSubscribeNotification |
| | | req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification |
| | | |
| | | return &req |
| | | } |
| | |
| | | logger.Error(err.Error()) |
| | | return |
| | | } |
| | | if c.Type == "1400" { |
| | | if c.Type == "1400-face" { |
| | | if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess { |
| | | c.UpdateRetryCount() |
| | | logger.Warn("The data resend failed. retry count %d", c.Retry+1) |
| | | return |
| | | } |
| | | } else if c.Type == "1400-person" { |
| | | if client.PersonCapture([]byte(c.Data)) != vo.StatusSuccess { |
| | | c.UpdateRetryCount() |
| | | logger.Warn("The data resend failed. retry count %d", c.Retry+1) |
| | | return |
| | | } |
| | | } else { |
| | | if !util.SendData([]byte(c.Data), config.ForwardConf.SyncServer) { |
| | | c.UpdateRetryCount() |
| | |
| | | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "gat1400Exchange/config" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "gat1400Exchange/client" |
| | | "gat1400Exchange/config" |
| | | "gat1400Exchange/models" |
| | | "gat1400Exchange/pkg/logger" |
| | | "gat1400Exchange/pkg/snowflake" |
| | | "gat1400Exchange/vo" |
| | | ) |
| | | |
| | |
| | | |
| | | type TaskProcInfo struct { |
| | | cancel context.CancelFunc |
| | | task *SubscribeTask |
| | | task *NotificationTask |
| | | } |
| | | |
| | | func (t *TaskProcInfo) stop() { |
| | | func (t TaskProcInfo) stop() { |
| | | t.cancel() |
| | | t.task = nil |
| | | } |
| | | |
| | | func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) { |
| | | logger.Debug("Receive sub msg: %s ", subId) |
| | | func InitSubscribeNotificationTasks() { |
| | | var s models.Subscribe |
| | | subList, err := s.FindByFromId(config.ClientConf.ServerId) |
| | | if err != nil { |
| | | logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error()) |
| | | return |
| | | } |
| | | |
| | | for idx := range subList { |
| | | if subList[idx].Status != 0 { |
| | | continue |
| | | } |
| | | |
| | | CreateNotificationTask(&subList[idx]) |
| | | } |
| | | |
| | | return |
| | | } |
| | | |
| | | func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) { |
| | | logger.Debug("Receive update notify task msg: %s %d", subId, msgType) |
| | | |
| | | proc, isExist := TaskProcMap.Load(subId) |
| | | |
| | | switch msgType { |
| | | case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: |
| | | if isExist { |
| | | logger.Debug("更新任务, 退出重新开始") |
| | | proc.(TaskProcInfo).cancel() |
| | | logger.Debug("Update notify task, restart after exit") |
| | | proc.(TaskProcInfo).stop() |
| | | } |
| | | |
| | | CreateTask(conf) |
| | | CreateNotificationTask(conf) |
| | | case vo.Msg_Type_Delete_Subscribe: |
| | | if !isExist { |
| | | return |
| | | } |
| | | |
| | | // 关闭任务, 并删除 |
| | | proc.(TaskProcInfo).cancel() |
| | | proc.(TaskProcInfo).stop() |
| | | default: |
| | | logger.Warn("未知的消息类型") |
| | | logger.Warn("Invalid msg type %d", msgType) |
| | | } |
| | | } |
| | | |
| | | func InitSubscribeTask() { |
| | | var s models.Subscribe |
| | | subList, err := s.FindByFromId(config.ClientConf.ServerId) |
| | | if err != nil { |
| | | logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err) |
| | | return |
| | | func CreateNotificationTask(conf *models.Subscribe) { |
| | | logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title) |
| | | |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | task := &NotificationTask{ |
| | | ctx: ctx, |
| | | conf: conf, |
| | | } |
| | | |
| | | for idx, _ := range subList { |
| | | if subList[idx].Status != 0 { |
| | | continue |
| | | } |
| | | |
| | | CreateTask(&subList[idx]) |
| | | } |
| | | |
| | | return |
| | | } |
| | | |
| | | func AddFaceNotification(face *vo.FaceObject) { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).task.AddFace(face) |
| | | return true |
| | | TaskProcMap.Store(conf.Id, TaskProcInfo{ |
| | | cancel: cancel, |
| | | task: task, |
| | | }) |
| | | |
| | | logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature) |
| | | go task.start() |
| | | } |
| | | |
| | | func StopSubscribeTask() { |
| | | func StopNotificationTasks() { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).cancel() |
| | | return true |
| | |
| | | TaskWaitGroup.Wait() |
| | | } |
| | | |
| | | func CreateTask(conf *models.Subscribe) { |
| | | logger.Debug("添加订阅任务,%s", conf.Id) |
| | | |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | newTask := &SubscribeTask{ |
| | | ctx: ctx, |
| | | conf: conf, |
| | | } |
| | | |
| | | TaskProcMap.Store(conf.Id, TaskProcInfo{ |
| | | cancel: cancel, |
| | | task: newTask, |
| | | func AddFaceNotification(face *vo.FaceObject) { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).task.addFace(face) |
| | | return true |
| | | }) |
| | | |
| | | go newTask.Start() |
| | | logger.Debug("Add Face Notification. faceId: %s", face.FaceID) |
| | | } |
| | | |
| | | type SubscribeTask struct { |
| | | ctx context.Context |
| | | conf *models.Subscribe |
| | | faceList []*vo.FaceObject |
| | | mutex sync.Mutex |
| | | lastApeExecTime int64 |
| | | } |
| | | func AddPersonNotification(person *vo.PersonObject) { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).task.addPerson(person) |
| | | return true |
| | | }) |
| | | |
| | | func (task *SubscribeTask) Start() { |
| | | TaskWaitGroup.Add(1) |
| | | Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) |
| | | |
| | | for { |
| | | select { |
| | | case <-task.ctx.Done(): |
| | | TaskProcMap.Delete(task.conf.Id) |
| | | TaskWaitGroup.Done() |
| | | logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) |
| | | return |
| | | case <-Timer.C: |
| | | task.Notify() |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (task *SubscribeTask) AddFace(face *vo.FaceObject) { |
| | | if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { |
| | | return |
| | | } |
| | | |
| | | task.mutex.Lock() |
| | | defer task.mutex.Unlock() |
| | | |
| | | task.faceList = append(task.faceList, face) |
| | | } |
| | | |
| | | func (task *SubscribeTask) Notify() { |
| | | subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") |
| | | for _, subType := range subDetails { |
| | | triggerTime := time.Now().Format("20060102150405") |
| | | |
| | | // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新, |
| | | if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 { |
| | | task.lastApeExecTime = time.Now().Unix() |
| | | |
| | | var notification = vo.DeviceNotification{ |
| | | NotificationID: triggerTime + snowflake.GenerateIdStr(), |
| | | SubscribeID: task.conf.Id, |
| | | Title: task.conf.Ext.Title, |
| | | TriggerTime: triggerTime, |
| | | ExecuteOperation: 1, |
| | | } |
| | | |
| | | var ids []string |
| | | var apeMod models.Ape |
| | | apeList, err := apeMod.FindAll() |
| | | if err != nil { |
| | | logger.Warn(err.Error()) |
| | | continue |
| | | } else { |
| | | for idx, _ := range apeList { |
| | | ids = append(ids, apeList[idx].Id) |
| | | notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) |
| | | } |
| | | } |
| | | |
| | | notification.InfoIDs = strings.Join(ids, ";") |
| | | |
| | | var req vo.RequestSubscribeNotification |
| | | req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) |
| | | b, _ := json.Marshal(req) |
| | | client.Notify(task.conf.Ext.ReceiveAddr, b) |
| | | continue |
| | | } |
| | | |
| | | // 上报人脸 |
| | | if subType == vo.SubscribeFace { |
| | | if len(task.faceList) == 0 { |
| | | return |
| | | } |
| | | |
| | | var notification = vo.FaceNotification{ |
| | | NotificationID: triggerTime + snowflake.GenerateIdStr(), |
| | | SubscribeID: task.conf.Id, |
| | | Title: task.conf.Ext.Title, |
| | | TriggerTime: triggerTime, |
| | | ExecuteOperation: 1, |
| | | } |
| | | var ids []string |
| | | for idx, _ := range task.faceList { |
| | | // 按海康示例填充修改字段 |
| | | task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime |
| | | task.faceList[idx].EntryTime = triggerTime |
| | | for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { |
| | | task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 |
| | | } |
| | | |
| | | ids = append(ids, task.faceList[idx].FaceID) |
| | | notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) |
| | | } |
| | | |
| | | notification.InfoIDs = strings.Join(ids, ";") |
| | | |
| | | task.mutex.Lock() |
| | | task.faceList = []*vo.FaceObject{} |
| | | task.mutex.Unlock() |
| | | |
| | | var req vo.RequestSubscribeNotification |
| | | req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) |
| | | b, _ := json.Marshal(req) |
| | | client.Notify(task.conf.Ext.ReceiveAddr, b) |
| | | } |
| | | } |
| | | logger.Debug("Add Person Notification. personId: %s", person.PersonID) |
| | | } |