zhangzengfei
2024-05-18 3771d5551480964ab17f4c23a152df2482bf6470
更新设备操作
1个文件已删除
15个文件已修改
396 ■■■■■ 已修改文件
controller/captureCtl.go 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/subscribeCtl.go 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/systemCtl.go 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/ape.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/cascade.go 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/db.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/device.go 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/positions.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/subplatform.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/apeRepo.go 73 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/captureRepo.go 66 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
routes/subscribe.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/device.go 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/nvcs.go 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/http.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vo/ape.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/captureCtl.go
@@ -52,9 +52,6 @@
        go a.Repository.FaceForward(req.FaceListObject.FaceObject)
    }
    // 设备保活
    service.KeepDeviceAlive(face.DeviceID)
    rspMsg := vo.ResponseStatus{
        RequestURL:   c.FullPath(),
        StatusCode:   vo.StatusSuccess,
@@ -106,9 +103,6 @@
    if config.ForwardConf.SyncServer != "" {
        go a.Repository.FaceForward([]vo.FaceObject{face})
    }
    // 设备保活
    service.KeepDeviceAlive(videoLabel.IVADeviceID)
    rspMsg := vo.ResponseStatus{
        RequestURL:   c.FullPath(),
controller/subscribeCtl.go
@@ -15,14 +15,16 @@
type SubscribeController struct {
    Repository repository.SubscribeRepository
    Srv        repository.CaptureRepository
    Capture    repository.CaptureRepository
    Ape        repository.ApeRepository
}
// 构造函数
func NewSubscribeController() SubscribeController {
    svr := repository.NewSubscribeRepository()
    svr1 := repository.NewCaptureRepository()
    controller := SubscribeController{svr, svr1}
    svr2 := repository.NewApeRepository()
    controller := SubscribeController{svr, svr1, svr2}
    return controller
}
@@ -105,7 +107,7 @@
    c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp})
}
func (s SubscribeController) Notifications(c *gin.Context) {
func (s SubscribeController) VIIDNotifications(c *gin.Context) {
    var rsp vo.ResponseStatusList
    var req vo.RequestSubscribeNotificationBind
    if err := c.BindJSON(&req); err != nil {
@@ -125,8 +127,13 @@
        })
        // 转发
        if config.ForwardConf.SyncServer != "" {
            go s.Srv.FaceForward(msg.FaceObjectList.FaceObject)
        if config.ForwardConf.SyncServer != "" && len(msg.FaceObjectList.FaceObject) > 0 {
            go s.Capture.FaceForward(msg.FaceObjectList.FaceObject)
        }
        if len(msg.DeviceList.APEObject) > 0 {
            fromId := c.GetHeader("User-Identify")
            go s.Ape.HandleNotification(fromId, msg.DeviceList.APEObject)
        }
    }
controller/systemCtl.go
@@ -1,13 +1,13 @@
package controller
import (
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/service"
    "net/http"
    "sync"
    "time"
    "gat1400Exchange/config"
    "gat1400Exchange/pkg/auth"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/repository"
    "gat1400Exchange/vo"
@@ -15,8 +15,9 @@
)
type SystemController struct {
    Auth    *auth.DigestAuth
    ApeRepo repository.ApeRepository
    Auth     *auth.DigestAuth
    ApeRepo  repository.ApeRepository
    DevCache *sync.Map // 缓存设备类型, id为key, value为 ape 采集设备, subPlatform 下级, cascade上级
}
// 构造函数
@@ -24,7 +25,12 @@
    svr := repository.NewApeRepository()
    controller := SystemController{ApeRepo: svr}
    controller.Auth = auth.NewDigestAuthenticator("Basic Realm", func(user, realm string) string {
    realm := "Basic Realm"
    if config.ServeConf.Realm != "" {
        realm = config.ServeConf.Realm
    }
    controller.Auth = auth.NewDigestAuthenticator(realm, func(user, realm string) string {
        if user == config.ServeConf.Username {
            return config.ServeConf.Password
        }
@@ -33,6 +39,10 @@
    })
    controller.Auth.PlainTextSecrets = true
    for k, v := range svr.CollectDeviceType() {
        controller.DevCache.Store(k, v)
    }
    return controller
}
@@ -61,10 +71,18 @@
    }
    if user == config.ServeConf.Username {
        s.DevCache.Store(req.RegisterObject.DeviceID, "ape")
        if err := s.ApeRepo.Create(req.RegisterObject.DeviceID); err != nil {
            logger.Warn("Create ape failure,%s", err.Error())
            c.AbortWithStatus(http.StatusInternalServerError)
            return
        }
    } else {
        // 未缓存的id, 可能是新添加的上下级, 更新缓存
        _, ok := s.DevCache.Load(req.RegisterObject.DeviceID)
        if ok {
            s.ApeRepo.CollectDeviceType()
        }
    }
@@ -79,8 +97,9 @@
        return
    }
    service.KeepDeviceAlive(req.KeepaliveObject.DeviceID)
    s.ApeRepo.Keepalive(req.KeepaliveObject.DeviceID)
    if devType, ok := s.DevCache.Load(req.KeepaliveObject.DeviceID); ok {
        s.ApeRepo.Keepalive(req.KeepaliveObject.DeviceID, devType.(int))
    }
    rspMsg := vo.ResponseStatus{
        RequestURL:   c.FullPath(),
models/ape.go
@@ -8,6 +8,7 @@
type Ape struct {
    Id            string `gorm:"column:id;primary_key;" json:"id"`
    Name          string `gorm:"column:name" json:"name"`
    FromId        string `gorm:"column:from_id" json:"from_id"`
    HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
    Ext           vo.Ape `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"`
    CreateTime    int64  `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
@@ -27,8 +28,8 @@
    return db.Table(a.TableName()).Save(a).Error
}
func (a *Ape) Keepalive() error {
    return db.Table(a.TableName()).Where("id = ?", a.Id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error
func (a *Ape) Keepalive(id string) error {
    return db.Table(a.TableName()).Where("id = ?", id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error
}
func (a *Ape) FindAll() ([]Ape, error) {
models/cascade.go
@@ -1,19 +1,35 @@
package models
import "time"
type Cascade struct {
    Id         string   `gorm:"column:id;primary_key;" json:"id"`
    Name       string   `gorm:"column:name" json:"name"`
    Username   string   `gorm:"column:username" json:"username"`
    Password   string   `gorm:"column:password" json:"password"`
    IP         string   `gorm:"column:ip" json:"ip"`
    Port       int      `gorm:"column:port" json:"port"`
    Enabled    bool     `gorm:"column:enabled" json:"enabled"`
    DeviceIDs  []string `gorm:"column:device_ids;type:text[];default '{}'" json:"device_ids"`
    CreateTime int64    `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime int64    `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime int64    `gorm:"column:delete_time" json:"-"`
    Id            string   `gorm:"column:id;primary_key;" json:"id"`
    Name          string   `gorm:"column:name" json:"name"`
    Username      string   `gorm:"column:username" json:"username"`
    Password      string   `gorm:"column:password" json:"password"`
    IP            string   `gorm:"column:ip" json:"ip"`
    Port          int      `gorm:"column:port" json:"port"`
    Enabled       bool     `gorm:"column:enabled" json:"enabled"`
    DeviceIDs     []string `gorm:"column:device_ids;type:text[];default '{}'" json:"device_ids"`
    HeartbeatTime string   `gorm:"column:heartbeat_time;" json:"heartbeat_time"`
    CreateTime    int64    `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime    int64    `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime    int64    `gorm:"column:delete_time" json:"-"`
}
func (c *Cascade) TableName() string {
    return "cascades"
}
func (c *Cascade) FindAll() ([]Cascade, error) {
    var list []Cascade
    if err := db.Table(c.TableName()).Find(&list).Error; err != nil {
        return nil, err
    }
    return list, nil
}
func (c *Cascade) Keepalive(id string) error {
    return db.Table(c.TableName()).Where("id = ?", id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error
}
models/db.go
@@ -24,7 +24,6 @@
        &Ape{},
        &Cache{},
        &Cascade{},
        &Device{},
        &Positions{},
        &SubPlatform{},
        &Subscribe{},
models/device.go
File was deleted
models/positions.go
@@ -14,6 +14,9 @@
    return "positions"
}
func (d *Positions) Save() error {
    return db.Table(d.TableName()).Save(d).Error
}
func (d *Positions) FindDevicePosition(devId string, timestamp int64) error {
    return db.Table(d.TableName()).Where("device_id = ? AND create_time <= ?", devId, timestamp).Order("create_time desc").First(&d).Error
}
models/subplatform.go
@@ -1,8 +1,9 @@
package models
import "time"
type SubPlatform struct {
    Id            string `gorm:"primary_key;" json:"id"`
    HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
    Name          string `gorm:"not null;default:''" json:"name"`
    UserName      string `gorm:"not null;default:''" json:"user_name"`
    Realm         string `gorm:"not null;default:''" json:"realm"`
@@ -10,6 +11,7 @@
    Description   string `gorm:"not null;default:''" json:"description"`
    RemoteIP      string `gorm:"not null;default:''" json:"remote_ip"`
    RemotePort    int    `gorm:"not null;default:0" json:"remote_port"`
    HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
    CreateTime    int64  `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime    int64  `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime    int64  `gorm:"column:delete_time" json:"-"`
@@ -39,3 +41,7 @@
    return list, nil
}
func (s *SubPlatform) Keepalive(id string) error {
    return db.Table(s.TableName()).Where("id = ?", id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error
}
repository/apeRepo.go
@@ -7,6 +7,12 @@
    "gat1400Exchange/vo"
)
const (
    deviceTypeApe int = iota
    deviceTypeCascade
    deviceTypeSubPlatform
)
type ApeRepository struct {
}
@@ -14,15 +20,18 @@
    return ApeRepository{}
}
func (a *ApeRepository) Keepalive(id string) error {
    var ape models.Ape
    // 设备存在
    if err := ape.FindById(id); err != nil {
        return nil
func (a *ApeRepository) Keepalive(id string, devType int) {
    switch devType {
    case deviceTypeApe:
        var m models.Ape
        m.Keepalive(id)
    case deviceTypeCascade:
        var m models.Cascade
        m.Keepalive(id)
    case deviceTypeSubPlatform:
        var m models.SubPlatform
        m.Keepalive(id)
    }
    return ape.Keepalive()
}
func (a *ApeRepository) Create(id string) error {
@@ -80,3 +89,51 @@
    return ape.Save()
}
func (a *ApeRepository) HandleNotification(fromId string, apes []vo.Ape) {
    for _, a := range apes {
        var ape models.Ape
        err := ape.FindById(a.ApeID)
        if err == nil {
            ape.Ext = a
        } else {
            ape = models.Ape{
                Id:            a.ApeID,
                Name:          a.Name,
                FromId:        fromId,
                HeartbeatTime: "",
                Ext:           a,
            }
        }
        ape.Save()
    }
}
func (a *ApeRepository) CollectDeviceType() map[string]int {
    var ret = make(map[string]int, 0)
    var ap models.Ape
    if list, err := ap.FindAll(); err == nil {
        for _, item := range list {
            ret[item.Id] = deviceTypeApe
        }
    }
    var ca models.Cascade
    if list, err := ca.FindAll(); err == nil {
        for _, item := range list {
            ret[item.Id] = deviceTypeCascade
        }
    }
    var su models.SubPlatform
    if list, err := su.FindAll(); err == nil {
        for _, item := range list {
            ret[item.Id] = deviceTypeSubPlatform
        }
    }
    return ret
}
repository/captureRepo.go
@@ -11,8 +11,6 @@
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/util"
    "gat1400Exchange/vo"
    uuid "github.com/satori/go.uuid"
)
type CaptureRepository struct {
@@ -117,60 +115,6 @@
    return
}
func (c CaptureRepository) PackPushData(deviceId, faceId, faceImage, appearTime string) *vo.PushDataInfo {
    var pd = new(vo.PushDataInfo)
    var device models.Device
    if err := device.FindById(deviceId); err != nil {
        logger.Warn("Can't find device in database, device:%s, %s", deviceId, err.Error())
        return pd
    }
    // 匹配楼层
    faceAppearTime, err := time.ParseInLocation("20060102150405", appearTime, time.Local)
    if err != nil {
        logger.Warn("Parse face appear time error, %s", err.Error())
        faceAppearTime = time.Now()
    }
    // 电梯停止的时间
    devStopTime := time.Now().Format("2006-01-02 15:04:05")
    var devPos models.Positions
    _ = devPos.FindDevicePosition(deviceId, faceAppearTime.Unix()+5) // 加5秒电梯关门的时间
    if devPos.TimeString != "" {
        devStopTime = devPos.TimeString
    }
    imageBytes, err := base64.StdEncoding.DecodeString(faceImage)
    if err != nil {
        logger.Warn("Decode Image Base64 String failure, %s", err.Error())
        return pd
    }
    pd.PicMaxImages = append(pd.PicMaxImages, imageBytes)
    tr := vo.TaskResultInfo{
        Id:            uuid.NewV4().String(),
        CameraId:      deviceId,
        CameraAddr:    device.Addr + devPos.Pos,
        CameraName:    device.Name,
        PicMaxUrl:     []string{""},
        PicDate:       faceAppearTime.Format("2006-01-02 15:04:05"),
        LikeDate:      devStopTime,
        AnalyServerId: deviceId,
        DataSource:    "camera",
        TargetInfo:    []vo.TargetInfo{{TargetId: faceId}},
    }
    pd.SourceData = vo.ESInfo{
        TaskResultInfo: tr,
        Version:        "3.3",
        UpdateTime:     time.Now().Format("2006-01-02 15:04:05"),
    }
    return pd
}
func (c CaptureRepository) PackPushDataV2(deviceId, faceId, appearTime string, bgImgBytes, faceImgBytes []byte) *vo.PushDataInfoV2 {
    var pd = new(vo.PushDataInfoV2)
    var floor string
@@ -182,19 +126,9 @@
    }
    if config.ServeConf.Role == "server" {
        var device models.Device
        if err := device.FindById(deviceId); err != nil {
            logger.Warn("Can't find device in database, device:%s, %s", deviceId, err.Error())
            return pd
        }
        // 匹配楼层
        var devPos models.Positions
        _ = devPos.FindDevicePosition(deviceId, faceAppearTime.Unix()+5) // 加5秒电梯关门的时间
        if devPos.Pos == "" {
            devPos.Pos = device.Floor
        }
        floor = devPos.Pos
    }
routes/subscribe.go
@@ -12,7 +12,7 @@
    r.POST("/Subscribes", subCtl.VIIDSubscribes)
    r.PUT("/Subscribes", subCtl.VIIDUpdateSubscribes)
    r.DELETE("/Subscribes", subCtl.VIIDDeleteSubscribe)
    r.POST("/SubscribeNotifications", subCtl.Notifications)
    r.POST("/SubscribeNotifications", subCtl.VIIDNotifications)
    return r
}
service/device.go
@@ -2,21 +2,14 @@
import (
    "encoding/json"
    "fmt"
    "time"
    "gat1400Exchange/config"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/models"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/util"
    "github.com/hashicorp/golang-lru/v2/expirable"
)
var deviceAliveCache *expirable.LRU[string, bool]
func init() {
    deviceAliveCache = expirable.NewLRU[string, bool](100, nil, time.Second*60)
}
type DevReportData struct {
    Code        string                 `json:"code"`        // 设备ID
@@ -45,25 +38,32 @@
        return nil
    }
    logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer)
    logger.Info("Start device report task, server:%s.", config.ForwardConf.ReportServer)
    var d models.Device
    var d models.Ape
    devices, err := d.FindAll()
    if err != nil {
        return err
    }
    for _, dev := range devices{
        if _, exists := deviceAliveCache.Get(dev.Id); !exists {
            continue
    for _, dev := range devices {
        // 级联的设备不检查心跳
        if dev.FromId == "" {
            hTime, err := time.ParseInLocation("2006-01-02 15:04:05", dev.HeartbeatTime, time.Local)
            if err != nil || hTime.Unix()+120 < time.Now().Unix() {
                continue
            }
        }
        device := DevReportData{
            Code:      dev.Id,
            Model:     dev.Ext.Model,
            Type:      "camera",
            Name:      dev.Name,
            Addr:      dev.Addr+dev.Floor,
            IpAddr:    dev.Ip,
            Addr:      dev.Ext.Place,
            IpAddr:    dev.Ext.IPAddr,
            Latitude:  fmt.Sprintf("%f", dev.Ext.Latitude),
            Longitude: fmt.Sprintf("%f", dev.Ext.Longitude),
        }
        data, err := json.Marshal(device)
@@ -71,9 +71,7 @@
            return err
        }
        logger.Info("Report device info. %+v", dev)
        headers := map[string]string{"Content-Type": "applicaiton/json; charset=UTF-8"}
        headers := map[string]string{"Content-Type": "application/json; charset=UTF-8"}
        _, err = util.HttpPost(config.ForwardConf.ReportServer, headers, data)
        if err != nil {
            return err
@@ -81,18 +79,4 @@
    }
    return nil
}
func KeepDeviceAlive(id string) {
    // 上报设备信息
    var d = models.Device{
        Id: id,
    }
    err := d.Upsert()
    if err != nil {
        logger.Warn("Device db update camera error:%s", err.Error())
    }
    deviceAliveCache.Add(id, true)
}
service/nvcs.go
@@ -7,6 +7,7 @@
    "io/ioutil"
    "net"
    "strings"
    "time"
    "gat1400Exchange/config"
    "gat1400Exchange/models"
@@ -75,12 +76,12 @@
        decodedBytes, err := ioutil.ReadAll(reader)
        var data ElevatorData
        // Unmarshal JSON into the struct
        err = json.Unmarshal(decodedBytes, &data)
        if err != nil {
            logger.Warn("ElevatorData unmarshal error:%s", err.Error())
            continue
        }
        logger.Debug("Received %d bytes from %s, %+v", numBytes, clientAddr, data)
        if len(data.Elevator) == 0 {
            continue
@@ -100,20 +101,17 @@
            }
        }
        var d = models.Device{
            Id:    elevator.Name,
            Floor: elevator.Status.FloorName,
            Ip:    elevator.IP,
        var d = models.Positions{
            DeviceId:   elevator.Name,
            Pos:        elevator.Status.FloorName,
            CreateTime: time.Now().Unix(),
            TimeString: time.Now().Format("2006-01-02 15:04:05"),
        }
        err = d.Upsert()
        err = d.Save()
        if err != nil {
            logger.Warn("Device db update error:%s", err.Error())
            logger.Warn("Device position update error:%s", err.Error())
        }
        deviceAliveCache.Add(elevator.Name, true)
        logger.Debug("Received %d bytes from %s, %+v", numBytes, clientAddr, data)
    }
}
util/http.go
@@ -37,7 +37,6 @@
        return nil, err
    }
    //req.Header.Set("Content-Type", "applicaiton/json; charset=UTF-8")
    if header != nil {
        for k, v := range header {
            req.Header.Set(k, v)
vo/ape.go
@@ -6,6 +6,7 @@
    "errors"
)
// 直接注册的采集设备, Device包含这些设备, 分开存只是为了级联上报
type Ape struct {
    ApeID            string  `json:"ApeID"`
    Name             string  `json:"Name"`