From 790c60c55054b3e75043eaed11eaef8584d2001d Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期四, 16 五月 2024 09:57:59 +0800 Subject: [PATCH] 添加级联,订阅功能 --- vo/constant.go | 25 + vo/face.go | 4 vo/subscribe.go | 76 ++++ vo/ape.go | 58 +++ repository/apeRepo.go | 82 ++++ models/ape.go | 41 ++ models/subplatform.go | 20 + controller/captureCtl.go | 5 service/subscribe.go | 213 ++++++++++++ routes/subscribe.go | 16 service/device.go | 4 client/notify.go | 31 + repository/subscribeRepo.go | 44 ++ controller/systemCtl.go | 58 +++ client/system.go | 2 vo/message.go | 4 service/resend.go | 8 client/faces.go | 3 routes/routes.go | 2 models/subscribe.go | 40 ++ routes/system.go | 4 pkg/snowflake/snowflake.go | 118 ++++++ config/config.go | 2 models/cascade.go | 19 + repository/captureRepo.go | 2 config/gat1400.yaml | 21 + models/db.go | 10 main.go | 4 controller/subscribeCtl.go | 80 ++++ 29 files changed, 976 insertions(+), 20 deletions(-) diff --git a/client/faces.go b/client/faces.go index 7dde6a4..c4f1f17 100644 --- a/client/faces.go +++ b/client/faces.go @@ -1,8 +1,9 @@ package client import ( - "encoding/json" "fmt" + + "encoding/json" "gat1400Exchange/config" "gat1400Exchange/pkg/logger" "gat1400Exchange/util" diff --git a/client/notify.go b/client/notify.go new file mode 100644 index 0000000..9532e84 --- /dev/null +++ b/client/notify.go @@ -0,0 +1,31 @@ +package client + +import ( + "encoding/json" + "gat1400Exchange/pkg/logger" + "gat1400Exchange/util" + "gat1400Exchange/vo" +) + +func Notify(url string, msg []byte) int { + if clientStatus != vo.StatusSuccess { + return clientStatus + } + + rsp, err := util.HttpPost(url, headers, msg) + if err != nil { + logger.Warn("Post notification failed, %s", err.Error()) + return vo.StatusOtherError + } + + var stat vo.ResponseStatusList + err = json.Unmarshal(rsp, &stat) + if err != nil { + logger.Warn("Post notification response unmarshal failed, %s", err.Error()) + return vo.StatusOtherError + } + + logger.Debug("Post notification success.") + + return vo.StatusSuccess +} diff --git a/client/system.go b/client/system.go index 80e8858..b2e540b 100644 --- a/client/system.go +++ b/client/system.go @@ -3,11 +3,11 @@ import ( "encoding/json" "fmt" - "gat1400Exchange/util" "io/ioutil" "gat1400Exchange/config" "gat1400Exchange/pkg/logger" + "gat1400Exchange/util" "gat1400Exchange/vo" dac "github.com/xinsnake/go-http-digest-auth-client" diff --git a/config/config.go b/config/config.go index bc53545..84b926e 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,8 @@ Mode string `mapstructure:"mode"` Host string `mapstructure:"host"` Port string `mapstructure:"port"` + Realm string `mapstructure:"realm"` + Username string `mapstructure:"username"` Password string `mapstructure:"password"` Role string `mapstructure:"role"` // agent 璁惧绔�, proxy 1400涓浆 涓嶅鐞哾eivce, server 鍏ㄥ姛鑳� } diff --git a/config/gat1400.yaml b/config/gat1400.yaml index 40ae1ce..f4aa22d 100644 --- a/config/gat1400.yaml +++ b/config/gat1400.yaml @@ -1,11 +1,26 @@ # web 鏈嶅姟閰嶇疆 serve: # 璁惧畾妯″紡(debug/release/test,姝e紡鐗堟敼涓簉elease) - id: "11010500011121000001" + id: "12312312315031231233" mode: "debug" port: "1400" - host: "0.0.0.0" - password: "basic1400server" + host: "192.168.20.119" + username: "admin" + password: "Aa123456" + realm: "Basic Realm" + role: "cascade" +client: + enable: true + device-id: "12312312315031200003" + username: "12312312315031200003" + password: "123456" + server-addr: "192.168.20.189" + server-port: 1400 + proto: http + upload-type: binary + channel-number: "12312312315031200003" + heartbeat-interval: 30 + heartbeat-count: 3 # 鏃ュ織閰嶇疆 log: diff --git a/controller/captureCtl.go b/controller/captureCtl.go index ef00027..dbf3b31 100644 --- a/controller/captureCtl.go +++ b/controller/captureCtl.go @@ -41,7 +41,10 @@ // 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒 face := req.FaceListObject.FaceObject[0] if config.ClientConf.Enable && config.ServeConf.Role == "agent" { - a.Repository.MsgForward(&req) + a.Repository.VIIDMsgForward(&req) + } else if config.ServeConf.Role == "cascade" { + logger.Debug("Receive new message, Id:%s Ip:%s faceId:%s, LeftTopX:%d, appearTime:%s", c.RemoteIP(), face.DeviceID, face.FaceID, face.LeftTopX, face.FaceAppearTime) + service.AddFaceCapture(&face) } else { logger.Debug("Receive new message, Id:%s Ip:%s faceId:%s, LeftTopX:%d, appearTime:%s", c.RemoteIP(), face.DeviceID, face.FaceID, face.LeftTopX, face.FaceAppearTime) a.Repository.FaceForward(req.FaceListObject.FaceObject) diff --git a/controller/subscribeCtl.go b/controller/subscribeCtl.go new file mode 100644 index 0000000..9ef4d86 --- /dev/null +++ b/controller/subscribeCtl.go @@ -0,0 +1,80 @@ +package controller + +import ( + "fmt" + "gat1400Exchange/repository" + "gat1400Exchange/vo" + "net/http" + "strings" + "time" + + "github.com/gin-gonic/gin" +) + +type SubscribeController struct { + Repository repository.SubscribeRepository +} + +// 鏋勯�犲嚱鏁� +func NewSubscribeController() SubscribeController { + svr := repository.NewSubscribeRepository() + controller := SubscribeController{svr} + + return controller +} + +func (s SubscribeController) Subscribes(c *gin.Context) { + var req vo.RequestSubscribe + if err := c.BindJSON(&req); err != nil { + c.AbortWithStatus(http.StatusBadRequest) + return + } + + fromId := c.GetHeader("User-Identify") + + var rsp vo.ResponseStatusList + for idx, sub := range req.SubscribeListObject.SubscribeObject { + if err := s.Repository.CreateSubscribe(fromId, &req.SubscribeListObject.SubscribeObject[idx]); err == nil { + rsp.ResponseStatusObject = append(rsp.ResponseStatusObject, vo.ResponseStatus{ + RequestURL: c.FullPath(), + StatusCode: vo.StatusSuccess, + StatusString: vo.StatusString[vo.StatusSuccess], + Id: sub.SubscribeID, + LocalTime: time.Now().Format("20060102150405"), + }) + } + } + + c.Header("Content-Type", "application/VIID+json;charset=UTF-8") + c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp}) +} + +func (s SubscribeController) DeleteSubscribe(c *gin.Context) { + idList := c.Query("IDList") + var rsp vo.ResponseStatusList + + for _, id := range strings.Split(idList, ",") { + if err := s.Repository.DeleteSubscribe(id); err == nil { + rsp.ResponseStatusObject = append(rsp.ResponseStatusObject, vo.ResponseStatus{ + RequestURL: c.FullPath(), + StatusCode: vo.StatusSuccess, + StatusString: vo.StatusString[vo.StatusSuccess], + Id: id, + LocalTime: time.Now().Format("20060102150405"), + }) + } + } + + c.Header("Content-Type", "application/VIID+json;charset=UTF-8") + c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp}) +} + +func (s SubscribeController) Notifications(c *gin.Context) { + data, err := c.GetRawData() + if err != nil { + + } + + fmt.Println("Notifications:", string(data)) + c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": ""}) +} diff --git a/controller/systemCtl.go b/controller/systemCtl.go index dbf947b..567f401 100644 --- a/controller/systemCtl.go +++ b/controller/systemCtl.go @@ -1,6 +1,7 @@ package controller import ( + "gat1400Exchange/pkg/logger" "gat1400Exchange/service" "net/http" "time" @@ -14,17 +15,20 @@ ) type SystemController struct { - Auth *auth.DigestAuth - Repository repository.SystemRepository + Auth *auth.DigestAuth + ApeRepo repository.ApeRepository } // 鏋勯�犲嚱鏁� func NewSystemController() SystemController { - svr := repository.NewSystemRepository() - controller := SystemController{Repository: svr} + svr := repository.NewApeRepository() + controller := SystemController{ApeRepo: svr} controller.Auth = auth.NewDigestAuthenticator("Basic Realm", func(user, realm string) string { - // 闇�瑕佸湪杩欓噷瀹炵幇妫�鏌ョ敤鎴峰悕鍜屽瘑鐮佹槸鍚︽湁鏁堢殑閫昏緫, 鐩墠浣跨敤鍥哄畾瀵嗙爜 + if user == config.ServeConf.Username { + return config.ServeConf.Password + } + return config.ServeConf.Password }) @@ -42,12 +46,26 @@ return } + var req vo.RequestRegister + if err := c.BindJSON(&req); err != nil { + c.AbortWithStatus(http.StatusBadRequest) + return + } + rspMsg := vo.ResponseStatus{ RequestURL: c.FullPath(), StatusCode: vo.StatusSuccess, StatusString: vo.StatusString[vo.StatusSuccess], - Id: user, + Id: req.RegisterObject.DeviceID, LocalTime: time.Now().Format("20060102150405"), + } + + if user == config.ServeConf.Username { + if err := s.ApeRepo.Create(req.RegisterObject.DeviceID); err != nil { + logger.Warn("Create ape failure,%s", err.Error()) + c.AbortWithStatus(http.StatusInternalServerError) + return + } } c.JSON(http.StatusOK, gin.H{"ResponseStatusObject": rspMsg}) @@ -62,6 +80,7 @@ } service.KeepDeviceAlive(req.KeepaliveObject.DeviceID) + s.ApeRepo.Keepalive(req.KeepaliveObject.DeviceID) rspMsg := vo.ResponseStatus{ RequestURL: c.FullPath(), @@ -104,3 +123,30 @@ c.JSON(http.StatusOK, gin.H{"SystemTimeObject": rspMsg}) } + +// 璁惧鍒楄〃 +func (s SystemController) ApeList(c *gin.Context) { + apeList, err := s.ApeRepo.List() + if err != nil { + logger.Error(err.Error()) + } + + c.JSON(http.StatusOK, gin.H{"ApeList": apeList}) +} + +// 淇敼璁惧 +func (s SystemController) ApeUpdate(c *gin.Context) { + var req vo.Ape + if err := c.BindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) + return + } + + err := s.ApeRepo.Update(&req) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"msg": "ok"}) +} diff --git a/main.go b/main.go index 316e032..d457661 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,8 @@ // 鍚姩缃戠粶瑙嗛瀛楃鍙犲姞鍣ㄦ湇鍔� go service.NVCSServer() + go service.InitSubscribeTask() + // 鍚姩瀹氭椂浠诲姟 cron.Init() @@ -75,5 +77,7 @@ logger.Error("Server forced to shutdown:", err) } + service.StopSubscribeTask() + logger.Info("Server exiting!") } diff --git a/models/ape.go b/models/ape.go new file mode 100644 index 0000000..89ccdce --- /dev/null +++ b/models/ape.go @@ -0,0 +1,41 @@ +package models + +import ( + "gat1400Exchange/vo" + "time" +) + +type Ape struct { + Id string `gorm:"column:id;primary_key;" json:"id"` + Name string `gorm:"column:name" json:"name"` + HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"` + Ext vo.Ape `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"` + CreateTime int64 `gorm:"column:create_time;autoCreateTime;" json:"create_time"` + UpdateTime int64 `gorm:"column:update_time;autoUpdateTime" json:"-"` + DeleteTime int64 `gorm:"column:delete_time" json:"-"` +} + +func (a *Ape) TableName() string { + return "apes" +} + +func (a *Ape) FindById(id string) error { + return db.Table(a.TableName()).First(&a, "id = ?", id).Error +} + +func (a *Ape) Save() error { + return db.Table(a.TableName()).Save(a).Error +} + +func (a *Ape) Keepalive() error { + return db.Table(a.TableName()).Where("id = ?", a.Id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error +} + +func (a *Ape) FindAll() ([]Ape, error) { + var devices []Ape + if err := db.Table(a.TableName()).Find(&devices).Error; err != nil { + return nil, err + } + + return devices, nil +} diff --git a/models/cascade.go b/models/cascade.go new file mode 100644 index 0000000..c857d79 --- /dev/null +++ b/models/cascade.go @@ -0,0 +1,19 @@ +package models + +type Cascade struct { + Id string `gorm:"column:id;primary_key;" json:"id"` + Name string `gorm:"column:name" json:"name"` + Username string `gorm:"column:username" json:"username"` + Password string `gorm:"column:password" json:"password"` + IP string `gorm:"column:ip" json:"ip"` + Port int `gorm:"column:port" json:"port"` + Enabled bool `gorm:"column:enabled" json:"enabled"` + DeviceIDs []string `gorm:"column:device_ids;type:text[];default '{}'" json:"device_ids"` + CreateTime int64 `gorm:"column:create_time;autoCreateTime;" json:"create_time"` + UpdateTime int64 `gorm:"column:update_time;autoUpdateTime" json:"-"` + DeleteTime int64 `gorm:"column:delete_time" json:"-"` +} + +func (c *Cascade) TableName() string { + return "cascades" +} diff --git a/models/db.go b/models/db.go index d6cb1f7..e7dab32 100644 --- a/models/db.go +++ b/models/db.go @@ -20,7 +20,15 @@ logger.Debug("db open error ", err) return err } - _ = db.AutoMigrate(&Device{}, &Positions{}, &Cache{}) + _ = db.AutoMigrate( + &Ape{}, + &Cache{}, + &Cascade{}, + &Device{}, + &Positions{}, + &SubPlatform{}, + &Subscribe{}, + ) // 娣诲姞榛樿鏁版嵁 InitData() diff --git a/models/subplatform.go b/models/subplatform.go new file mode 100644 index 0000000..470defe --- /dev/null +++ b/models/subplatform.go @@ -0,0 +1,20 @@ +package models + +type SubPlatform struct { + Id string `gorm:"primary_key;" json:"id"` + HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"` + Name string `gorm:"not null;default:''" json:"name"` + UserName string `gorm:"not null;default:''" json:"user_name"` + Realm string `gorm:"not null;default:''" json:"realm"` + Password string `gorm:"not null;default:''" json:"password"` + Description string `gorm:"not null;default:''" json:"description"` + RemoteIP string `gorm:"not null;default:''" json:"remote_ip"` + RemotePort int `gorm:"not null;default:0" json:"remote_port"` + CreateTime int64 `gorm:"column:create_time;autoCreateTime;" json:"create_time"` + UpdateTime int64 `gorm:"column:update_time;autoUpdateTime" json:"-"` + DeleteTime int64 `gorm:"column:delete_time" json:"-"` +} + +func (s *SubPlatform) TableName() string { + return "sub_platforms" +} diff --git a/models/subscribe.go b/models/subscribe.go new file mode 100644 index 0000000..62cc18b --- /dev/null +++ b/models/subscribe.go @@ -0,0 +1,40 @@ +package models + +import ( + "gat1400Exchange/vo" +) + +type Subscribe struct { + Id string `gorm:"column:id;primary_key;" json:"id"` + Status int `gorm:"column:status" json:"status"` // 0锛氳闃呬腑 1锛氬凡鍙栨秷璁㈤槄 2锛氳闃呭埌鏈� 9锛氭湭璁㈤槄 + FromId string `gorm:"column:from_id" json:"from_id"` + Ext vo.Subscribe `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"` + CreateTime int64 `gorm:"column:create_time;autoCreateTime;" json:"create_time"` + UpdateTime int64 `gorm:"column:update_time;autoUpdateTime" json:"-"` + DeleteTime int64 `gorm:"column:delete_time" json:"-"` +} + +func (s *Subscribe) TableName() string { + return "subscribes" +} + +func (s *Subscribe) FindById(id string) error { + return db.Table(s.TableName()).First(&s, "id = ?", id).Error +} + +func (s *Subscribe) Save() error { + return db.Table(s.TableName()).Save(s).Error +} + +func (s *Subscribe) DeleteById(id string) error { + return db.Table(s.TableName()).Where("id = ?", id).Delete(s).Error +} + +func (s *Subscribe) FindAll() ([]Subscribe, error) { + var subs []Subscribe + if err := db.Table(s.TableName()).Find(&subs).Error; err != nil { + return nil, err + } + + return subs, nil +} diff --git a/pkg/snowflake/snowflake.go b/pkg/snowflake/snowflake.go new file mode 100644 index 0000000..ad86994 --- /dev/null +++ b/pkg/snowflake/snowflake.go @@ -0,0 +1,118 @@ +package snowflake + +import ( + "errors" + "strconv" + "sync" + "time" +) + +const ( + CEpoch = 1474802888000 + CWorkerIdBits = 10 // Num of WorkerId Bits + CSenquenceBits = 12 // Num of Sequence Bits + + CWorkerIdShift = 12 + CTimeStampShift = 22 + + CSequenceMask = 0xfff // equal as getSequenceMask() + CMaxWorker = 0x3ff // equal as getMaxWorkerId() +) + +type IdWorker struct { + workerId int64 + lastTimeStamp int64 + sequence int64 + maxWorkerId int64 + lock *sync.Mutex +} + +func NewIdWorker(workerId int64) (iw *IdWorker, err error) { + iw = new(IdWorker) + + iw.maxWorkerId = getMaxWorkerId() + + if workerId > iw.maxWorkerId || workerId < 0 { + return nil, errors.New("worker not fit") + } + iw.workerId = workerId + iw.lastTimeStamp = -1 + iw.sequence = 0 + iw.lock = new(sync.Mutex) + return iw, nil +} + +func getMaxWorkerId() int64 { + return -1 ^ -1<<CWorkerIdBits +} + +func getSequenceMask() int64 { + return -1 ^ -1<<CSenquenceBits +} + +// return in ms +func (iw *IdWorker) timeGen() int64 { + return time.Now().UnixNano() / 1000 / 1000 +} + +func (iw *IdWorker) timeReGen(last int64) int64 { + ts := time.Now().UnixNano() / 1000 / 1000 + for { + if ts <= last { + ts = iw.timeGen() + } else { + break + } + } + return ts +} + +func (iw *IdWorker) NextId() (ts int64, err error) { + iw.lock.Lock() + defer iw.lock.Unlock() + ts = iw.timeGen() + if ts == iw.lastTimeStamp { + iw.sequence = (iw.sequence + 1) & CSequenceMask + if iw.sequence == 0 { + ts = iw.timeReGen(ts) + } + } else { + iw.sequence = 0 + } + + if ts < iw.lastTimeStamp { + err = errors.New("Clock moved backwards, Refuse gen id") + return 0, err + } + iw.lastTimeStamp = ts + ts = (ts-CEpoch)<<CTimeStampShift | iw.workerId<<CWorkerIdShift | iw.sequence + return ts, nil +} + +func ParseId(id int64) (t time.Time, ts int64, workerId int64, seq int64) { + seq = id & CSequenceMask + workerId = (id >> CWorkerIdShift) & CMaxWorker + ts = (id >> CTimeStampShift) + CEpoch + t = time.Unix(ts/1000, (ts%1000)*1000000) + return +} + +var idGenerater, _ = NewIdWorker(0) + +func GenerateId() int64 { +start: + id, err := idGenerater.NextId() + if err != nil { + goto start + } + return id +} + +func GenerateIdStr() string { +start: + id, err := idGenerater.NextId() + if err != nil { + goto start + } + return strconv.FormatInt(id, 10) +} diff --git a/repository/apeRepo.go b/repository/apeRepo.go new file mode 100644 index 0000000..379ec9a --- /dev/null +++ b/repository/apeRepo.go @@ -0,0 +1,82 @@ +package repository + +import ( + "time" + + "gat1400Exchange/models" + "gat1400Exchange/vo" +) + +type ApeRepository struct { +} + +func NewApeRepository() ApeRepository { + return ApeRepository{} +} + +func (a *ApeRepository) Keepalive(id string) error { + var ape models.Ape + + // 璁惧瀛樺湪 + if err := ape.FindById(id); err != nil { + return nil + } + + return ape.Keepalive() +} + +func (a *ApeRepository) Create(id string) error { + var ape models.Ape + + // 璁惧瀛樺湪 + if err := ape.FindById(id); err == nil { + return nil + } + + ape.Id = id + ape.Name = id + ape.HeartbeatTime = time.Now().Format("2006-01-02 15:04:05") + ape.Ext = vo.Ape{ + ApeID: id, + Name: "", + Model: "", + IPAddr: "", + IPV6Addr: "", + Port: 0, + Longitude: 0, + Latitude: 0, + PlaceCode: "", + Place: "", + OrgCode: "", + CapDirection: 0, + MonitorDirection: "", + MonitorAreaDesc: "", + IsOnline: "2", + OwnerApsID: "", + UserID: "", + Password: "", + FunctionType: "2", + } + + return ape.Save() +} + +func (a *ApeRepository) List() ([]models.Ape, error) { + var ape models.Ape + + return ape.FindAll() +} + +func (a *ApeRepository) Update(req *vo.Ape) error { + var ape models.Ape + + err := ape.FindById(req.ApeID) + if err != nil { + return err + } + + ape.Name = req.Name + ape.Ext = *req + + return ape.Save() +} diff --git a/repository/captureRepo.go b/repository/captureRepo.go index c5516e1..cddc2a6 100644 --- a/repository/captureRepo.go +++ b/repository/captureRepo.go @@ -212,7 +212,7 @@ cacheItem.Save() } -func (c CaptureRepository) MsgForward(msg *vo.RequestFaceList) { +func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) { faceInfo := msg.FaceListObject.FaceObject[0] // 鍖归厤妤煎眰 faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local) diff --git a/repository/subscribeRepo.go b/repository/subscribeRepo.go new file mode 100644 index 0000000..f5101b6 --- /dev/null +++ b/repository/subscribeRepo.go @@ -0,0 +1,44 @@ +package repository + +import ( + "gat1400Exchange/models" + "gat1400Exchange/service" + "gat1400Exchange/vo" +) + +type SubscribeRepository struct { +} + +func NewSubscribeRepository() SubscribeRepository { + return SubscribeRepository{} +} + +func (a *SubscribeRepository) CreateSubscribe(fromId string, subscribe *vo.Subscribe) error { + var sub = models.Subscribe{ + Id: subscribe.SubscribeID, + Status: subscribe.SubscribeStatus, + FromId: fromId, + Ext: *subscribe, + } + + err := sub.Save() + if err != nil { + return err + } + + service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) + + return err +} + +func (a *SubscribeRepository) DeleteSubscribe(id string) error { + var sub = models.Subscribe{} + err := sub.DeleteById(id) + if err != nil { + return err + } + + service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil) + + return err +} diff --git a/routes/routes.go b/routes/routes.go index 5827b2f..09ff41a 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -35,6 +35,8 @@ // 娉ㄥ唽閲囬泦鎺ュ彛璺敱 InitCaptureRouters(apiGroup) + InitSubscribesRouters(apiGroup) + logger.Info("鍒濆鍖栬矾鐢卞畬鎴愶紒") return r diff --git a/routes/subscribe.go b/routes/subscribe.go new file mode 100644 index 0000000..9710832 --- /dev/null +++ b/routes/subscribe.go @@ -0,0 +1,16 @@ +package routes + +import ( + "gat1400Exchange/controller" + "github.com/gin-gonic/gin" +) + +func InitSubscribesRouters(r *gin.RouterGroup) gin.IRoutes { + subCtl := controller.NewSubscribeController() + + r.POST("/Subscribes", subCtl.Subscribes) + r.DELETE("/Subscribes", subCtl.DeleteSubscribe) + r.POST("/SubscribeNotifications", subCtl.Notifications) + + return r +} diff --git a/routes/system.go b/routes/system.go index ba2611f..519498a 100644 --- a/routes/system.go +++ b/routes/system.go @@ -17,5 +17,9 @@ router.POST("/Keepalive", sysCtl.Keepalive) router.GET("/Time", sysCtl.Time) } + + r.GET("/APEs", sysCtl.ApeList) + r.POST("/APEs", sysCtl.ApeUpdate) + return r } diff --git a/service/device.go b/service/device.go index acfaf18..86d9b77 100644 --- a/service/device.go +++ b/service/device.go @@ -38,12 +38,12 @@ } func DeviceInfoReportTask() error { - logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer) - if config.ServeConf.Role == "agent" { return nil } + logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer) + if config.ForwardConf.ReportServer == "" { return errors.New("Server addr is empty!") } diff --git a/service/resend.go b/service/resend.go index e56ffc1..ac62207 100644 --- a/service/resend.go +++ b/service/resend.go @@ -10,15 +10,15 @@ ) func ResendImageData() { - var cacheMod models.Cache - cacheItems, _ := cacheMod.FindAll() - - logger.Debug("Start resend task. cache len:%d", len(cacheItems)) if err := util.HttpGet(config.ForwardConf.SyncServer); err != nil { logger.Debug("The server cannot be reached. %s", err.Error()) return } + var cacheMod models.Cache + cacheItems, _ := cacheMod.FindAll() + logger.Debug("Start resend task. cache len:%d", len(cacheItems)) + for _, c := range cacheItems { if c.Type == "1400" { if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess { diff --git a/service/subscribe.go b/service/subscribe.go new file mode 100644 index 0000000..574cb8e --- /dev/null +++ b/service/subscribe.go @@ -0,0 +1,213 @@ +package service + +import ( + "context" + "encoding/json" + "strings" + "sync" + "time" + + "gat1400Exchange/client" + "gat1400Exchange/models" + "gat1400Exchange/pkg/logger" + "gat1400Exchange/pkg/snowflake" + "gat1400Exchange/vo" +) + +var TaskProcMap sync.Map +var TaskWaitGroup = &sync.WaitGroup{} + +type TaskProcInfo struct { + cancel context.CancelFunc + task *SubscribeTask +} + +func (t *TaskProcInfo) stop() { + t.cancel() + t.task = nil +} + +func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) { + logger.Debug("Receive sub msg: %s ", subId) + + proc, isExist := TaskProcMap.Load(subId) + + switch msgType { + case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: + if isExist { + logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�") + proc.(TaskProcInfo).cancel() + } + + CreateTask(conf) + case vo.Msg_Type_Delete_Subscribe: + if !isExist { + return + } + + // 鍏抽棴浠诲姟, 骞跺垹闄� + proc.(TaskProcInfo).cancel() + default: + logger.Warn("鏈煡鐨勬秷鎭被鍨�") + } +} + +func InitSubscribeTask() error { + var s models.Subscribe + subList, err := s.FindAll() + if err != nil { + logger.Error("Find account by channel error:%v", err) + return err + } + + for idx, _ := range subList { + if subList[idx].Status != 0 { + continue + } + + CreateTask(&subList[idx]) + } + + return nil +} + +func AddFaceCapture(face *vo.FaceObject) { + TaskProcMap.Range(func(key, value interface{}) bool { + value.(TaskProcInfo).task.AddFace(face) + return true + }) + + logger.Debug("娣诲姞浜鸿劯.") + //TaskWaitGroup.Wait() +} + +func StopSubscribeTask() { + TaskProcMap.Range(func(key, value interface{}) bool { + value.(TaskProcInfo).cancel() + return true + }) + + logger.Debug("绛夊緟鎵�鏈変换鍔¢��鍑�.") + TaskWaitGroup.Wait() +} + +func CreateTask(conf *models.Subscribe) { + logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id) + + ctx, cancel := context.WithCancel(context.Background()) + newTask := &SubscribeTask{ + ctx: ctx, + conf: conf, + } + + TaskProcMap.Store(conf.Id, TaskProcInfo{ + cancel: cancel, + task: newTask, + }) + + go newTask.Start() +} + +type SubscribeTask struct { + ctx context.Context + conf *models.Subscribe + faceList []*vo.FaceObject + mutex sync.Mutex +} + +func (task *SubscribeTask) Start() { + TaskWaitGroup.Add(1) + Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) + + for { + select { + case <-task.ctx.Done(): + TaskProcMap.Delete(task.conf.Id) + TaskWaitGroup.Done() + logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) + return + case <-Timer.C: + task.Notify() + } + } +} + +func (task *SubscribeTask) AddFace(face *vo.FaceObject) { + if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { + return + } + + task.mutex.Lock() + defer task.mutex.Unlock() + + task.faceList = append(task.faceList, face) +} + +func (task *SubscribeTask) Notify() { + subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") + for _, subType := range subDetails { + triggerTime := time.Now().Format("20060102150405") + + // 涓婃姤璁惧 + if subType == vo.SubscribeApe { + var notification = vo.DeviceNotification{ + NotificationID: triggerTime + snowflake.GenerateIdStr(), + SubscribeID: task.conf.Id, + Title: task.conf.Ext.Title, + TriggerTime: triggerTime, + ExecuteOperation: 1, + } + + var ids []string + var apeMod models.Ape + apeList, err := apeMod.FindAll() + if err != nil { + logger.Warn(err.Error()) + continue + } else { + for idx, _ := range apeList { + ids = append(ids, apeList[idx].Id) + notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) + } + } + + notification.InfoIDs = strings.Join(ids, ";") + + var req vo.RequestSubscribeNotification + req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) + b, _ := json.Marshal(req) + client.Notify(task.conf.Ext.ReceiveAddr, b) + continue + } + + // 涓婃姤浜鸿劯 + if subType == vo.SubscribeFace { + if len(task.faceList) == 0 { + return + } + + var notification = vo.FaceNotification{ + NotificationID: triggerTime + snowflake.GenerateIdStr(), + SubscribeID: task.conf.Id, + Title: task.conf.Ext.Title, + TriggerTime: triggerTime, + } + var ids []string + for idx, _ := range task.faceList { + ids = append(ids, task.faceList[idx].FaceID) + notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) + } + + notification.InfoIDs = strings.Join(ids, ";") + + task.mutex.Lock() + task.faceList = []*vo.FaceObject{} + task.mutex.Unlock() + + var req vo.RequestSubscribeNotification + req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) + b, _ := json.Marshal(req) + client.Notify(task.conf.Ext.ReceiveAddr, b) + } + } +} diff --git a/vo/ape.go b/vo/ape.go new file mode 100644 index 0000000..f45c289 --- /dev/null +++ b/vo/ape.go @@ -0,0 +1,58 @@ +package vo + +import ( + "database/sql/driver" + "encoding/json" + "errors" +) + +type Ape struct { + ApeID string `json:"ApeID"` + Name string `json:"Name"` + Model string `json:"Model"` + IPAddr string `json:"IPAddr"` + IPV6Addr string `json:"IPV6Addr"` + Port int `json:"Port"` + Longitude float64 `json:"Longitude"` + Latitude float64 `json:"Latitude"` + PlaceCode string `json:"PlaceCode"` + Place string `json:"Place"` + OrgCode string `json:"OrgCode"` + CapDirection int `json:"CapDirection"` + MonitorDirection string `json:"MonitorDirection"` + MonitorAreaDesc string `json:"MonitorAreaDesc"` + IsOnline string `json:"IsOnline"` + OwnerApsID string `json:"OwnerApsID"` + UserID string `json:"UserId"` + Password string `json:"Password"` + FunctionType string `json:"FunctionType"` + PositionType string `json:"PositionType"` +} + +func (a *Ape) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("failed to unmarshal Author value") + } + var config Ape + err := json.Unmarshal(b, &config) + if err != nil { + return err + } + *a = config + return nil +} + +func (a Ape) Value() (driver.Value, error) { + return json.Marshal(a) +} + +type RequestApeList struct { + APEListObject struct { + APEObject []Ape `json:"APEObject"` + } `json:"APEListObject"` +} + +type NotificationApeList struct { + APEObject []Ape `json:"APEObject"` +} diff --git a/vo/constant.go b/vo/constant.go index a5c4442..c2352ff 100644 --- a/vo/constant.go +++ b/vo/constant.go @@ -26,3 +26,28 @@ StatusInvalidJsonContent: "JSON鍐呭鏃犳晥", StatusReboot: "绯荤粺閲嶅惎涓�", } + +// 1 妗堬紙浜嬶級浠剁洰褰� +// 2 鍗曚釜妗堬紙浜嬶級浠跺唴瀹� +// 4 閲囬泦璁惧鐘舵�� +// 5 閲囬泦绯荤粺鐩綍 +// 6 閲囬泦绯荤粺鐘舵�� +// 7 瑙嗛鍗″彛鐩綍 +// 8 鍗曚釜鍗″彛璁板綍 +// 9 杞﹂亾鐩綍 +// 10 鍗曚釜杞﹂亾璁板綍 +// 13 鑷姩閲囬泦鐨勮溅杈嗕俊鎭� +// 14 鑷姩閲囬泦鐨勯潪鏈哄姩杞﹁締淇℃伅 +// 15 鑷姩閲囬泦鐨勭墿鍝佷俊鎭� +// 16 鑷姩閲囬泦鐨勬枃浠朵俊鎭� +const ( + SubscribeApe = "3" // 閲囬泦璁惧鐩綍 + SubscribePerson = "11" // 鑷姩閲囬泦鐨勪汉鍛樹俊鎭� + SubscribeFace = "12" // 鑷姩閲囬泦鐨勪汉鑴镐俊鎭� +) + +const ( + Msg_Type_Create_Subscribe int = iota + Msg_Type_Update_Subscribe + Msg_Type_Delete_Subscribe +) diff --git a/vo/face.go b/vo/face.go index fd10b74..19e31da 100644 --- a/vo/face.go +++ b/vo/face.go @@ -95,3 +95,7 @@ FaceObject []FaceObject `json:"FaceObject"` } `json:"FaceListObject"` } + +type NotificationFaceList struct { + FaceObject []FaceObject `json:"FaceObject"` +} diff --git a/vo/message.go b/vo/message.go index 6ce5566..bcb4f33 100644 --- a/vo/message.go +++ b/vo/message.go @@ -30,3 +30,7 @@ LocalTime string TimeZone interface{} } + +type ResponseStatusList struct { + ResponseStatusObject []ResponseStatus +} diff --git a/vo/subscribe.go b/vo/subscribe.go new file mode 100644 index 0000000..29c7022 --- /dev/null +++ b/vo/subscribe.go @@ -0,0 +1,76 @@ +package vo + +import ( + "database/sql/driver" + "encoding/json" + "errors" +) + +type Subscribe struct { + SubscribeID string `json:"SubscribeID"` + Title string `json:"Title"` + SubscribeDetail string `json:"SubscribeDetail"` + ResourceURI string `json:"ResourceURI"` + ApplicantName string `json:"ApplicantName"` + ApplicantOrg string `json:"ApplicantOrg"` + BeginTime string `json:"BeginTime"` // Kept as string for direct compatibility + EndTime string `json:"EndTime"` // Kept as string for direct compatibility + ReceiveAddr string `json:"ReceiveAddr"` + ReportInterval int `json:"ReportInterval"` + Reason string `json:"Reason"` + OperateType int `json:"OperateType"` + SubscribeStatus int `json:"SubscribeStatus"` + SubscribeCancelOrg string `json:"SubscribeCancelOrg"` + SubscribeCancelPerson string `json:"SubscribeCancelPerson"` + CancelTime string `json:"CancelTime"` // Kept as string for direct compatibility + CancelReason string `json:"CancelReason"` +} + +func (s *Subscribe) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("failed to unmarshal Author value") + } + var config Subscribe + err := json.Unmarshal(b, &config) + if err != nil { + return err + } + *s = config + return nil +} + +func (s Subscribe) Value() (driver.Value, error) { + return json.Marshal(s) +} + +type RequestSubscribe struct { + SubscribeListObject struct { + SubscribeObject []Subscribe `json:"SubscribeObject"` + } `json:"SubscribeListObject"` +} + +type RequestSubscribeNotification struct { + SubscribeNotificationListObject struct { + SubscribeNotificationObject []interface{} `json:"SubscribeNotificationObject"` + } `json:"SubscribeNotificationListObject"` +} + +type DeviceNotification struct { + NotificationID string + SubscribeID string + Title string + TriggerTime string + InfoIDs string + ExecuteOperation int + DeviceList NotificationApeList +} + +type FaceNotification struct { + NotificationID string + SubscribeID string + Title string + TriggerTime string + InfoIDs string + FaceObjectList NotificationFaceList +} -- Gitblit v1.8.0