From 790c60c55054b3e75043eaed11eaef8584d2001d Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期四, 16 五月 2024 09:57:59 +0800
Subject: [PATCH] 添加级联,订阅功能

---
 vo/constant.go              |   25 +
 vo/face.go                  |    4 
 vo/subscribe.go             |   76 ++++
 vo/ape.go                   |   58 +++
 repository/apeRepo.go       |   82 ++++
 models/ape.go               |   41 ++
 models/subplatform.go       |   20 +
 controller/captureCtl.go    |    5 
 service/subscribe.go        |  213 ++++++++++++
 routes/subscribe.go         |   16 
 service/device.go           |    4 
 client/notify.go            |   31 +
 repository/subscribeRepo.go |   44 ++
 controller/systemCtl.go     |   58 +++
 client/system.go            |    2 
 vo/message.go               |    4 
 service/resend.go           |    8 
 client/faces.go             |    3 
 routes/routes.go            |    2 
 models/subscribe.go         |   40 ++
 routes/system.go            |    4 
 pkg/snowflake/snowflake.go  |  118 ++++++
 config/config.go            |    2 
 models/cascade.go           |   19 +
 repository/captureRepo.go   |    2 
 config/gat1400.yaml         |   21 +
 models/db.go                |   10 
 main.go                     |    4 
 controller/subscribeCtl.go  |   80 ++++
 29 files changed, 976 insertions(+), 20 deletions(-)

diff --git a/client/faces.go b/client/faces.go
index 7dde6a4..c4f1f17 100644
--- a/client/faces.go
+++ b/client/faces.go
@@ -1,8 +1,9 @@
 package client
 
 import (
-	"encoding/json"
 	"fmt"
+
+	"encoding/json"
 	"gat1400Exchange/config"
 	"gat1400Exchange/pkg/logger"
 	"gat1400Exchange/util"
diff --git a/client/notify.go b/client/notify.go
new file mode 100644
index 0000000..9532e84
--- /dev/null
+++ b/client/notify.go
@@ -0,0 +1,31 @@
+package client
+
+import (
+	"encoding/json"
+	"gat1400Exchange/pkg/logger"
+	"gat1400Exchange/util"
+	"gat1400Exchange/vo"
+)
+
+func Notify(url string, msg []byte) int {
+	if clientStatus != vo.StatusSuccess {
+		return clientStatus
+	}
+
+	rsp, err := util.HttpPost(url, headers, msg)
+	if err != nil {
+		logger.Warn("Post notification failed, %s", err.Error())
+		return vo.StatusOtherError
+	}
+
+	var stat vo.ResponseStatusList
+	err = json.Unmarshal(rsp, &stat)
+	if err != nil {
+		logger.Warn("Post notification response unmarshal failed, %s", err.Error())
+		return vo.StatusOtherError
+	}
+
+	logger.Debug("Post notification success.")
+
+	return vo.StatusSuccess
+}
diff --git a/client/system.go b/client/system.go
index 80e8858..b2e540b 100644
--- a/client/system.go
+++ b/client/system.go
@@ -3,11 +3,11 @@
 import (
 	"encoding/json"
 	"fmt"
-	"gat1400Exchange/util"
 	"io/ioutil"
 
 	"gat1400Exchange/config"
 	"gat1400Exchange/pkg/logger"
+	"gat1400Exchange/util"
 	"gat1400Exchange/vo"
 
 	dac "github.com/xinsnake/go-http-digest-auth-client"
diff --git a/config/config.go b/config/config.go
index bc53545..84b926e 100644
--- a/config/config.go
+++ b/config/config.go
@@ -13,6 +13,8 @@
 	Mode     string `mapstructure:"mode"`
 	Host     string `mapstructure:"host"`
 	Port     string `mapstructure:"port"`
+	Realm    string `mapstructure:"realm"`
+	Username string `mapstructure:"username"`
 	Password string `mapstructure:"password"`
 	Role     string `mapstructure:"role"` // agent 璁惧绔�, proxy 1400涓浆 涓嶅鐞哾eivce, server 鍏ㄥ姛鑳�
 }
diff --git a/config/gat1400.yaml b/config/gat1400.yaml
index 40ae1ce..f4aa22d 100644
--- a/config/gat1400.yaml
+++ b/config/gat1400.yaml
@@ -1,11 +1,26 @@
 # web 鏈嶅姟閰嶇疆
 serve:
   # 璁惧畾妯″紡(debug/release/test,姝e紡鐗堟敼涓簉elease)
-  id: "11010500011121000001"
+  id: "12312312315031231233"
   mode: "debug"
   port: "1400"
-  host: "0.0.0.0"
-  password: "basic1400server"
+  host: "192.168.20.119"
+  username: "admin"
+  password: "Aa123456"
+  realm: "Basic Realm"
+  role: "cascade"
+client:
+  enable: true
+  device-id: "12312312315031200003"
+  username: "12312312315031200003"
+  password: "123456"
+  server-addr: "192.168.20.189"
+  server-port: 1400
+  proto: http
+  upload-type: binary
+  channel-number: "12312312315031200003"
+  heartbeat-interval: 30
+  heartbeat-count: 3
 
 # 鏃ュ織閰嶇疆
 log:
diff --git a/controller/captureCtl.go b/controller/captureCtl.go
index ef00027..dbf3b31 100644
--- a/controller/captureCtl.go
+++ b/controller/captureCtl.go
@@ -41,7 +41,10 @@
 	// 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒
 	face := req.FaceListObject.FaceObject[0]
 	if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
-		a.Repository.MsgForward(&req)
+		a.Repository.VIIDMsgForward(&req)
+	} else if config.ServeConf.Role == "cascade" {
+		logger.Debug("Receive new message, Id:%s Ip:%s faceId:%s, LeftTopX:%d, appearTime:%s", c.RemoteIP(), face.DeviceID, face.FaceID, face.LeftTopX, face.FaceAppearTime)
+		service.AddFaceCapture(&face)
 	} else {
 		logger.Debug("Receive new message, Id:%s Ip:%s faceId:%s, LeftTopX:%d, appearTime:%s", c.RemoteIP(), face.DeviceID, face.FaceID, face.LeftTopX, face.FaceAppearTime)
 		a.Repository.FaceForward(req.FaceListObject.FaceObject)
diff --git a/controller/subscribeCtl.go b/controller/subscribeCtl.go
new file mode 100644
index 0000000..9ef4d86
--- /dev/null
+++ b/controller/subscribeCtl.go
@@ -0,0 +1,80 @@
+package controller
+
+import (
+	"fmt"
+	"gat1400Exchange/repository"
+	"gat1400Exchange/vo"
+	"net/http"
+	"strings"
+	"time"
+
+	"github.com/gin-gonic/gin"
+)
+
+type SubscribeController struct {
+	Repository repository.SubscribeRepository
+}
+
+// 鏋勯�犲嚱鏁�
+func NewSubscribeController() SubscribeController {
+	svr := repository.NewSubscribeRepository()
+	controller := SubscribeController{svr}
+
+	return controller
+}
+
+func (s SubscribeController) Subscribes(c *gin.Context) {
+	var req vo.RequestSubscribe
+	if err := c.BindJSON(&req); err != nil {
+		c.AbortWithStatus(http.StatusBadRequest)
+		return
+	}
+
+	fromId := c.GetHeader("User-Identify")
+
+	var rsp vo.ResponseStatusList
+	for idx, sub := range req.SubscribeListObject.SubscribeObject {
+		if err := s.Repository.CreateSubscribe(fromId, &req.SubscribeListObject.SubscribeObject[idx]); err == nil {
+			rsp.ResponseStatusObject = append(rsp.ResponseStatusObject, vo.ResponseStatus{
+				RequestURL:   c.FullPath(),
+				StatusCode:   vo.StatusSuccess,
+				StatusString: vo.StatusString[vo.StatusSuccess],
+				Id:           sub.SubscribeID,
+				LocalTime:    time.Now().Format("20060102150405"),
+			})
+		}
+	}
+
+	c.Header("Content-Type", "application/VIID+json;charset=UTF-8")
+	c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp})
+}
+
+func (s SubscribeController) DeleteSubscribe(c *gin.Context) {
+	idList := c.Query("IDList")
+	var rsp vo.ResponseStatusList
+
+	for _, id := range strings.Split(idList, ",") {
+		if err := s.Repository.DeleteSubscribe(id); err == nil {
+			rsp.ResponseStatusObject = append(rsp.ResponseStatusObject, vo.ResponseStatus{
+				RequestURL:   c.FullPath(),
+				StatusCode:   vo.StatusSuccess,
+				StatusString: vo.StatusString[vo.StatusSuccess],
+				Id:           id,
+				LocalTime:    time.Now().Format("20060102150405"),
+			})
+		}
+	}
+
+	c.Header("Content-Type", "application/VIID+json;charset=UTF-8")
+	c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp})
+}
+
+func (s SubscribeController) Notifications(c *gin.Context) {
+	data, err := c.GetRawData()
+	if err != nil {
+
+	}
+
+	fmt.Println("Notifications:", string(data))
+	c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": ""})
+}
diff --git a/controller/systemCtl.go b/controller/systemCtl.go
index dbf947b..567f401 100644
--- a/controller/systemCtl.go
+++ b/controller/systemCtl.go
@@ -1,6 +1,7 @@
 package controller
 
 import (
+	"gat1400Exchange/pkg/logger"
 	"gat1400Exchange/service"
 	"net/http"
 	"time"
@@ -14,17 +15,20 @@
 )
 
 type SystemController struct {
-	Auth       *auth.DigestAuth
-	Repository repository.SystemRepository
+	Auth    *auth.DigestAuth
+	ApeRepo repository.ApeRepository
 }
 
 // 鏋勯�犲嚱鏁�
 func NewSystemController() SystemController {
-	svr := repository.NewSystemRepository()
-	controller := SystemController{Repository: svr}
+	svr := repository.NewApeRepository()
+	controller := SystemController{ApeRepo: svr}
 
 	controller.Auth = auth.NewDigestAuthenticator("Basic Realm", func(user, realm string) string {
-		// 闇�瑕佸湪杩欓噷瀹炵幇妫�鏌ョ敤鎴峰悕鍜屽瘑鐮佹槸鍚︽湁鏁堢殑閫昏緫, 鐩墠浣跨敤鍥哄畾瀵嗙爜
+		if user == config.ServeConf.Username {
+			return config.ServeConf.Password
+		}
+
 		return config.ServeConf.Password
 	})
 
@@ -42,12 +46,26 @@
 		return
 	}
 
+	var req vo.RequestRegister
+	if err := c.BindJSON(&req); err != nil {
+		c.AbortWithStatus(http.StatusBadRequest)
+		return
+	}
+
 	rspMsg := vo.ResponseStatus{
 		RequestURL:   c.FullPath(),
 		StatusCode:   vo.StatusSuccess,
 		StatusString: vo.StatusString[vo.StatusSuccess],
-		Id:           user,
+		Id:           req.RegisterObject.DeviceID,
 		LocalTime:    time.Now().Format("20060102150405"),
+	}
+
+	if user == config.ServeConf.Username {
+		if err := s.ApeRepo.Create(req.RegisterObject.DeviceID); err != nil {
+			logger.Warn("Create ape failure,%s", err.Error())
+			c.AbortWithStatus(http.StatusInternalServerError)
+			return
+		}
 	}
 
 	c.JSON(http.StatusOK, gin.H{"ResponseStatusObject": rspMsg})
@@ -62,6 +80,7 @@
 	}
 
 	service.KeepDeviceAlive(req.KeepaliveObject.DeviceID)
+	s.ApeRepo.Keepalive(req.KeepaliveObject.DeviceID)
 
 	rspMsg := vo.ResponseStatus{
 		RequestURL:   c.FullPath(),
@@ -104,3 +123,30 @@
 
 	c.JSON(http.StatusOK, gin.H{"SystemTimeObject": rspMsg})
 }
+
+// 璁惧鍒楄〃
+func (s SystemController) ApeList(c *gin.Context) {
+	apeList, err := s.ApeRepo.List()
+	if err != nil {
+		logger.Error(err.Error())
+	}
+
+	c.JSON(http.StatusOK, gin.H{"ApeList": apeList})
+}
+
+// 淇敼璁惧
+func (s SystemController) ApeUpdate(c *gin.Context) {
+	var req vo.Ape
+	if err := c.BindJSON(&req); err != nil {
+		c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()})
+		return
+	}
+
+	err := s.ApeRepo.Update(&req)
+	if err != nil {
+		c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()})
+		return
+	}
+
+	c.JSON(http.StatusOK, gin.H{"msg": "ok"})
+}
diff --git a/main.go b/main.go
index 316e032..d457661 100644
--- a/main.go
+++ b/main.go
@@ -44,6 +44,8 @@
 	// 鍚姩缃戠粶瑙嗛瀛楃鍙犲姞鍣ㄦ湇鍔�
 	go service.NVCSServer()
 
+	go service.InitSubscribeTask()
+
 	// 鍚姩瀹氭椂浠诲姟
 	cron.Init()
 
@@ -75,5 +77,7 @@
 		logger.Error("Server forced to shutdown:", err)
 	}
 
+	service.StopSubscribeTask()
+
 	logger.Info("Server exiting!")
 }
diff --git a/models/ape.go b/models/ape.go
new file mode 100644
index 0000000..89ccdce
--- /dev/null
+++ b/models/ape.go
@@ -0,0 +1,41 @@
+package models
+
+import (
+	"gat1400Exchange/vo"
+	"time"
+)
+
+type Ape struct {
+	Id            string `gorm:"column:id;primary_key;" json:"id"`
+	Name          string `gorm:"column:name" json:"name"`
+	HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
+	Ext           vo.Ape `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"`
+	CreateTime    int64  `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
+	UpdateTime    int64  `gorm:"column:update_time;autoUpdateTime" json:"-"`
+	DeleteTime    int64  `gorm:"column:delete_time" json:"-"`
+}
+
+func (a *Ape) TableName() string {
+	return "apes"
+}
+
+func (a *Ape) FindById(id string) error {
+	return db.Table(a.TableName()).First(&a, "id = ?", id).Error
+}
+
+func (a *Ape) Save() error {
+	return db.Table(a.TableName()).Save(a).Error
+}
+
+func (a *Ape) Keepalive() error {
+	return db.Table(a.TableName()).Where("id = ?", a.Id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error
+}
+
+func (a *Ape) FindAll() ([]Ape, error) {
+	var devices []Ape
+	if err := db.Table(a.TableName()).Find(&devices).Error; err != nil {
+		return nil, err
+	}
+
+	return devices, nil
+}
diff --git a/models/cascade.go b/models/cascade.go
new file mode 100644
index 0000000..c857d79
--- /dev/null
+++ b/models/cascade.go
@@ -0,0 +1,19 @@
+package models
+
+type Cascade struct {
+	Id         string   `gorm:"column:id;primary_key;" json:"id"`
+	Name       string   `gorm:"column:name" json:"name"`
+	Username   string   `gorm:"column:username" json:"username"`
+	Password   string   `gorm:"column:password" json:"password"`
+	IP         string   `gorm:"column:ip" json:"ip"`
+	Port       int      `gorm:"column:port" json:"port"`
+	Enabled    bool     `gorm:"column:enabled" json:"enabled"`
+	DeviceIDs  []string `gorm:"column:device_ids;type:text[];default '{}'" json:"device_ids"`
+	CreateTime int64    `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
+	UpdateTime int64    `gorm:"column:update_time;autoUpdateTime" json:"-"`
+	DeleteTime int64    `gorm:"column:delete_time" json:"-"`
+}
+
+func (c *Cascade) TableName() string {
+	return "cascades"
+}
diff --git a/models/db.go b/models/db.go
index d6cb1f7..e7dab32 100644
--- a/models/db.go
+++ b/models/db.go
@@ -20,7 +20,15 @@
 		logger.Debug("db open error ", err)
 		return err
 	}
-	_ = db.AutoMigrate(&Device{}, &Positions{}, &Cache{})
+	_ = db.AutoMigrate(
+		&Ape{},
+		&Cache{},
+		&Cascade{},
+		&Device{},
+		&Positions{},
+		&SubPlatform{},
+		&Subscribe{},
+	)
 
 	// 娣诲姞榛樿鏁版嵁
 	InitData()
diff --git a/models/subplatform.go b/models/subplatform.go
new file mode 100644
index 0000000..470defe
--- /dev/null
+++ b/models/subplatform.go
@@ -0,0 +1,20 @@
+package models
+
+type SubPlatform struct {
+	Id            string `gorm:"primary_key;" json:"id"`
+	HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
+	Name          string `gorm:"not null;default:''" json:"name"`
+	UserName      string `gorm:"not null;default:''" json:"user_name"`
+	Realm         string `gorm:"not null;default:''" json:"realm"`
+	Password      string `gorm:"not null;default:''" json:"password"`
+	Description   string `gorm:"not null;default:''" json:"description"`
+	RemoteIP      string `gorm:"not null;default:''" json:"remote_ip"`
+	RemotePort    int    `gorm:"not null;default:0" json:"remote_port"`
+	CreateTime    int64  `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
+	UpdateTime    int64  `gorm:"column:update_time;autoUpdateTime" json:"-"`
+	DeleteTime    int64  `gorm:"column:delete_time" json:"-"`
+}
+
+func (s *SubPlatform) TableName() string {
+	return "sub_platforms"
+}
diff --git a/models/subscribe.go b/models/subscribe.go
new file mode 100644
index 0000000..62cc18b
--- /dev/null
+++ b/models/subscribe.go
@@ -0,0 +1,40 @@
+package models
+
+import (
+	"gat1400Exchange/vo"
+)
+
+type Subscribe struct {
+	Id         string       `gorm:"column:id;primary_key;" json:"id"`
+	Status     int          `gorm:"column:status" json:"status"` // 0锛氳闃呬腑 1锛氬凡鍙栨秷璁㈤槄 2锛氳闃呭埌鏈� 9锛氭湭璁㈤槄
+	FromId     string       `gorm:"column:from_id" json:"from_id"`
+	Ext        vo.Subscribe `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"`
+	CreateTime int64        `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
+	UpdateTime int64        `gorm:"column:update_time;autoUpdateTime" json:"-"`
+	DeleteTime int64        `gorm:"column:delete_time" json:"-"`
+}
+
+func (s *Subscribe) TableName() string {
+	return "subscribes"
+}
+
+func (s *Subscribe) FindById(id string) error {
+	return db.Table(s.TableName()).First(&s, "id = ?", id).Error
+}
+
+func (s *Subscribe) Save() error {
+	return db.Table(s.TableName()).Save(s).Error
+}
+
+func (s *Subscribe) DeleteById(id string) error {
+	return db.Table(s.TableName()).Where("id = ?", id).Delete(s).Error
+}
+
+func (s *Subscribe) FindAll() ([]Subscribe, error) {
+	var subs []Subscribe
+	if err := db.Table(s.TableName()).Find(&subs).Error; err != nil {
+		return nil, err
+	}
+
+	return subs, nil
+}
diff --git a/pkg/snowflake/snowflake.go b/pkg/snowflake/snowflake.go
new file mode 100644
index 0000000..ad86994
--- /dev/null
+++ b/pkg/snowflake/snowflake.go
@@ -0,0 +1,118 @@
+package snowflake
+
+import (
+	"errors"
+	"strconv"
+	"sync"
+	"time"
+)
+
+const (
+	CEpoch         = 1474802888000
+	CWorkerIdBits  = 10 // Num of WorkerId Bits
+	CSenquenceBits = 12 // Num of Sequence Bits
+
+	CWorkerIdShift  = 12
+	CTimeStampShift = 22
+
+	CSequenceMask = 0xfff // equal as getSequenceMask()
+	CMaxWorker    = 0x3ff // equal as getMaxWorkerId()
+)
+
+type IdWorker struct {
+	workerId      int64
+	lastTimeStamp int64
+	sequence      int64
+	maxWorkerId   int64
+	lock          *sync.Mutex
+}
+
+func NewIdWorker(workerId int64) (iw *IdWorker, err error) {
+	iw = new(IdWorker)
+
+	iw.maxWorkerId = getMaxWorkerId()
+
+	if workerId > iw.maxWorkerId || workerId < 0 {
+		return nil, errors.New("worker not fit")
+	}
+	iw.workerId = workerId
+	iw.lastTimeStamp = -1
+	iw.sequence = 0
+	iw.lock = new(sync.Mutex)
+	return iw, nil
+}
+
+func getMaxWorkerId() int64 {
+	return -1 ^ -1<<CWorkerIdBits
+}
+
+func getSequenceMask() int64 {
+	return -1 ^ -1<<CSenquenceBits
+}
+
+// return in ms
+func (iw *IdWorker) timeGen() int64 {
+	return time.Now().UnixNano() / 1000 / 1000
+}
+
+func (iw *IdWorker) timeReGen(last int64) int64 {
+	ts := time.Now().UnixNano() / 1000 / 1000
+	for {
+		if ts <= last {
+			ts = iw.timeGen()
+		} else {
+			break
+		}
+	}
+	return ts
+}
+
+func (iw *IdWorker) NextId() (ts int64, err error) {
+	iw.lock.Lock()
+	defer iw.lock.Unlock()
+	ts = iw.timeGen()
+	if ts == iw.lastTimeStamp {
+		iw.sequence = (iw.sequence + 1) & CSequenceMask
+		if iw.sequence == 0 {
+			ts = iw.timeReGen(ts)
+		}
+	} else {
+		iw.sequence = 0
+	}
+
+	if ts < iw.lastTimeStamp {
+		err = errors.New("Clock moved backwards, Refuse gen id")
+		return 0, err
+	}
+	iw.lastTimeStamp = ts
+	ts = (ts-CEpoch)<<CTimeStampShift | iw.workerId<<CWorkerIdShift | iw.sequence
+	return ts, nil
+}
+
+func ParseId(id int64) (t time.Time, ts int64, workerId int64, seq int64) {
+	seq = id & CSequenceMask
+	workerId = (id >> CWorkerIdShift) & CMaxWorker
+	ts = (id >> CTimeStampShift) + CEpoch
+	t = time.Unix(ts/1000, (ts%1000)*1000000)
+	return
+}
+
+var idGenerater, _ = NewIdWorker(0)
+
+func GenerateId() int64 {
+start:
+	id, err := idGenerater.NextId()
+	if err != nil {
+		goto start
+	}
+	return id
+}
+
+func GenerateIdStr() string {
+start:
+	id, err := idGenerater.NextId()
+	if err != nil {
+		goto start
+	}
+	return strconv.FormatInt(id, 10)
+}
diff --git a/repository/apeRepo.go b/repository/apeRepo.go
new file mode 100644
index 0000000..379ec9a
--- /dev/null
+++ b/repository/apeRepo.go
@@ -0,0 +1,82 @@
+package repository
+
+import (
+	"time"
+
+	"gat1400Exchange/models"
+	"gat1400Exchange/vo"
+)
+
+type ApeRepository struct {
+}
+
+func NewApeRepository() ApeRepository {
+	return ApeRepository{}
+}
+
+func (a *ApeRepository) Keepalive(id string) error {
+	var ape models.Ape
+
+	// 璁惧瀛樺湪
+	if err := ape.FindById(id); err != nil {
+		return nil
+	}
+
+	return ape.Keepalive()
+}
+
+func (a *ApeRepository) Create(id string) error {
+	var ape models.Ape
+
+	// 璁惧瀛樺湪
+	if err := ape.FindById(id); err == nil {
+		return nil
+	}
+
+	ape.Id = id
+	ape.Name = id
+	ape.HeartbeatTime = time.Now().Format("2006-01-02 15:04:05")
+	ape.Ext = vo.Ape{
+		ApeID:            id,
+		Name:             "",
+		Model:            "",
+		IPAddr:           "",
+		IPV6Addr:         "",
+		Port:             0,
+		Longitude:        0,
+		Latitude:         0,
+		PlaceCode:        "",
+		Place:            "",
+		OrgCode:          "",
+		CapDirection:     0,
+		MonitorDirection: "",
+		MonitorAreaDesc:  "",
+		IsOnline:         "2",
+		OwnerApsID:       "",
+		UserID:           "",
+		Password:         "",
+		FunctionType:     "2",
+	}
+
+	return ape.Save()
+}
+
+func (a *ApeRepository) List() ([]models.Ape, error) {
+	var ape models.Ape
+
+	return ape.FindAll()
+}
+
+func (a *ApeRepository) Update(req *vo.Ape) error {
+	var ape models.Ape
+
+	err := ape.FindById(req.ApeID)
+	if err != nil {
+		return err
+	}
+
+	ape.Name = req.Name
+	ape.Ext = *req
+
+	return ape.Save()
+}
diff --git a/repository/captureRepo.go b/repository/captureRepo.go
index c5516e1..cddc2a6 100644
--- a/repository/captureRepo.go
+++ b/repository/captureRepo.go
@@ -212,7 +212,7 @@
 	cacheItem.Save()
 }
 
-func (c CaptureRepository) MsgForward(msg *vo.RequestFaceList) {
+func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) {
 	faceInfo := msg.FaceListObject.FaceObject[0]
 	// 鍖归厤妤煎眰
 	faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local)
diff --git a/repository/subscribeRepo.go b/repository/subscribeRepo.go
new file mode 100644
index 0000000..f5101b6
--- /dev/null
+++ b/repository/subscribeRepo.go
@@ -0,0 +1,44 @@
+package repository
+
+import (
+	"gat1400Exchange/models"
+	"gat1400Exchange/service"
+	"gat1400Exchange/vo"
+)
+
+type SubscribeRepository struct {
+}
+
+func NewSubscribeRepository() SubscribeRepository {
+	return SubscribeRepository{}
+}
+
+func (a *SubscribeRepository) CreateSubscribe(fromId string, subscribe *vo.Subscribe) error {
+	var sub = models.Subscribe{
+		Id:     subscribe.SubscribeID,
+		Status: subscribe.SubscribeStatus,
+		FromId: fromId,
+		Ext:    *subscribe,
+	}
+
+	err := sub.Save()
+	if err != nil {
+		return err
+	}
+
+	service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
+
+	return err
+}
+
+func (a *SubscribeRepository) DeleteSubscribe(id string) error {
+	var sub = models.Subscribe{}
+	err := sub.DeleteById(id)
+	if err != nil {
+		return err
+	}
+
+	service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil)
+
+	return err
+}
diff --git a/routes/routes.go b/routes/routes.go
index 5827b2f..09ff41a 100644
--- a/routes/routes.go
+++ b/routes/routes.go
@@ -35,6 +35,8 @@
 	// 娉ㄥ唽閲囬泦鎺ュ彛璺敱
 	InitCaptureRouters(apiGroup)
 
+	InitSubscribesRouters(apiGroup)
+
 	logger.Info("鍒濆鍖栬矾鐢卞畬鎴愶紒")
 
 	return r
diff --git a/routes/subscribe.go b/routes/subscribe.go
new file mode 100644
index 0000000..9710832
--- /dev/null
+++ b/routes/subscribe.go
@@ -0,0 +1,16 @@
+package routes
+
+import (
+	"gat1400Exchange/controller"
+	"github.com/gin-gonic/gin"
+)
+
+func InitSubscribesRouters(r *gin.RouterGroup) gin.IRoutes {
+	subCtl := controller.NewSubscribeController()
+
+	r.POST("/Subscribes", subCtl.Subscribes)
+	r.DELETE("/Subscribes", subCtl.DeleteSubscribe)
+	r.POST("/SubscribeNotifications", subCtl.Notifications)
+
+	return r
+}
diff --git a/routes/system.go b/routes/system.go
index ba2611f..519498a 100644
--- a/routes/system.go
+++ b/routes/system.go
@@ -17,5 +17,9 @@
 		router.POST("/Keepalive", sysCtl.Keepalive)
 		router.GET("/Time", sysCtl.Time)
 	}
+
+	r.GET("/APEs", sysCtl.ApeList)
+	r.POST("/APEs", sysCtl.ApeUpdate)
+
 	return r
 }
diff --git a/service/device.go b/service/device.go
index acfaf18..86d9b77 100644
--- a/service/device.go
+++ b/service/device.go
@@ -38,12 +38,12 @@
 }
 
 func DeviceInfoReportTask() error {
-	logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer)
-
 	if config.ServeConf.Role == "agent" {
 		return nil
 	}
 
+	logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer)
+
 	if config.ForwardConf.ReportServer == "" {
 		return errors.New("Server addr is empty!")
 	}
diff --git a/service/resend.go b/service/resend.go
index e56ffc1..ac62207 100644
--- a/service/resend.go
+++ b/service/resend.go
@@ -10,15 +10,15 @@
 )
 
 func ResendImageData() {
-	var cacheMod models.Cache
-	cacheItems, _ := cacheMod.FindAll()
-
-	logger.Debug("Start resend task. cache len:%d", len(cacheItems))
 	if err := util.HttpGet(config.ForwardConf.SyncServer); err != nil {
 		logger.Debug("The server cannot be reached. %s", err.Error())
 		return
 	}
 
+	var cacheMod models.Cache
+	cacheItems, _ := cacheMod.FindAll()
+	logger.Debug("Start resend task. cache len:%d", len(cacheItems))
+
 	for _, c := range cacheItems {
 		if c.Type == "1400" {
 			if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess {
diff --git a/service/subscribe.go b/service/subscribe.go
new file mode 100644
index 0000000..574cb8e
--- /dev/null
+++ b/service/subscribe.go
@@ -0,0 +1,213 @@
+package service
+
+import (
+	"context"
+	"encoding/json"
+	"strings"
+	"sync"
+	"time"
+
+	"gat1400Exchange/client"
+	"gat1400Exchange/models"
+	"gat1400Exchange/pkg/logger"
+	"gat1400Exchange/pkg/snowflake"
+	"gat1400Exchange/vo"
+)
+
+var TaskProcMap sync.Map
+var TaskWaitGroup = &sync.WaitGroup{}
+
+type TaskProcInfo struct {
+	cancel context.CancelFunc
+	task   *SubscribeTask
+}
+
+func (t *TaskProcInfo) stop() {
+	t.cancel()
+	t.task = nil
+}
+
+func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
+	logger.Debug("Receive sub msg: %s ", subId)
+
+	proc, isExist := TaskProcMap.Load(subId)
+
+	switch msgType {
+	case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
+		if isExist {
+			logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�")
+			proc.(TaskProcInfo).cancel()
+		}
+
+		CreateTask(conf)
+	case vo.Msg_Type_Delete_Subscribe:
+		if !isExist {
+			return
+		}
+
+		// 鍏抽棴浠诲姟, 骞跺垹闄�
+		proc.(TaskProcInfo).cancel()
+	default:
+		logger.Warn("鏈煡鐨勬秷鎭被鍨�")
+	}
+}
+
+func InitSubscribeTask() error {
+	var s models.Subscribe
+	subList, err := s.FindAll()
+	if err != nil {
+		logger.Error("Find account by channel error:%v", err)
+		return err
+	}
+
+	for idx, _ := range subList {
+		if subList[idx].Status != 0 {
+			continue
+		}
+
+		CreateTask(&subList[idx])
+	}
+
+	return nil
+}
+
+func AddFaceCapture(face *vo.FaceObject) {
+	TaskProcMap.Range(func(key, value interface{}) bool {
+		value.(TaskProcInfo).task.AddFace(face)
+		return true
+	})
+
+	logger.Debug("娣诲姞浜鸿劯.")
+	//TaskWaitGroup.Wait()
+}
+
+func StopSubscribeTask() {
+	TaskProcMap.Range(func(key, value interface{}) bool {
+		value.(TaskProcInfo).cancel()
+		return true
+	})
+
+	logger.Debug("绛夊緟鎵�鏈変换鍔¢��鍑�.")
+	TaskWaitGroup.Wait()
+}
+
+func CreateTask(conf *models.Subscribe) {
+	logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	newTask := &SubscribeTask{
+		ctx:  ctx,
+		conf: conf,
+	}
+
+	TaskProcMap.Store(conf.Id, TaskProcInfo{
+		cancel: cancel,
+		task:   newTask,
+	})
+
+	go newTask.Start()
+}
+
+type SubscribeTask struct {
+	ctx      context.Context
+	conf     *models.Subscribe
+	faceList []*vo.FaceObject
+	mutex    sync.Mutex
+}
+
+func (task *SubscribeTask) Start() {
+	TaskWaitGroup.Add(1)
+	Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
+
+	for {
+		select {
+		case <-task.ctx.Done():
+			TaskProcMap.Delete(task.conf.Id)
+			TaskWaitGroup.Done()
+			logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
+			return
+		case <-Timer.C:
+			task.Notify()
+		}
+	}
+}
+
+func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
+	if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
+		return
+	}
+
+	task.mutex.Lock()
+	defer task.mutex.Unlock()
+
+	task.faceList = append(task.faceList, face)
+}
+
+func (task *SubscribeTask) Notify() {
+	subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
+	for _, subType := range subDetails {
+		triggerTime := time.Now().Format("20060102150405")
+
+		// 涓婃姤璁惧
+		if subType == vo.SubscribeApe {
+			var notification = vo.DeviceNotification{
+				NotificationID:   triggerTime + snowflake.GenerateIdStr(),
+				SubscribeID:      task.conf.Id,
+				Title:            task.conf.Ext.Title,
+				TriggerTime:      triggerTime,
+				ExecuteOperation: 1,
+			}
+
+			var ids []string
+			var apeMod models.Ape
+			apeList, err := apeMod.FindAll()
+			if err != nil {
+				logger.Warn(err.Error())
+				continue
+			} else {
+				for idx, _ := range apeList {
+					ids = append(ids, apeList[idx].Id)
+					notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
+				}
+			}
+
+			notification.InfoIDs = strings.Join(ids, ";")
+
+			var req vo.RequestSubscribeNotification
+			req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
+			b, _ := json.Marshal(req)
+			client.Notify(task.conf.Ext.ReceiveAddr, b)
+			continue
+		}
+
+		// 涓婃姤浜鸿劯
+		if subType == vo.SubscribeFace {
+			if len(task.faceList) == 0 {
+				return
+			}
+
+			var notification = vo.FaceNotification{
+				NotificationID: triggerTime + snowflake.GenerateIdStr(),
+				SubscribeID:    task.conf.Id,
+				Title:          task.conf.Ext.Title,
+				TriggerTime:    triggerTime,
+			}
+			var ids []string
+			for idx, _ := range task.faceList {
+				ids = append(ids, task.faceList[idx].FaceID)
+				notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
+			}
+
+			notification.InfoIDs = strings.Join(ids, ";")
+
+			task.mutex.Lock()
+			task.faceList = []*vo.FaceObject{}
+			task.mutex.Unlock()
+
+			var req vo.RequestSubscribeNotification
+			req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
+			b, _ := json.Marshal(req)
+			client.Notify(task.conf.Ext.ReceiveAddr, b)
+		}
+	}
+}
diff --git a/vo/ape.go b/vo/ape.go
new file mode 100644
index 0000000..f45c289
--- /dev/null
+++ b/vo/ape.go
@@ -0,0 +1,58 @@
+package vo
+
+import (
+	"database/sql/driver"
+	"encoding/json"
+	"errors"
+)
+
+type Ape struct {
+	ApeID            string  `json:"ApeID"`
+	Name             string  `json:"Name"`
+	Model            string  `json:"Model"`
+	IPAddr           string  `json:"IPAddr"`
+	IPV6Addr         string  `json:"IPV6Addr"`
+	Port             int     `json:"Port"`
+	Longitude        float64 `json:"Longitude"`
+	Latitude         float64 `json:"Latitude"`
+	PlaceCode        string  `json:"PlaceCode"`
+	Place            string  `json:"Place"`
+	OrgCode          string  `json:"OrgCode"`
+	CapDirection     int     `json:"CapDirection"`
+	MonitorDirection string  `json:"MonitorDirection"`
+	MonitorAreaDesc  string  `json:"MonitorAreaDesc"`
+	IsOnline         string  `json:"IsOnline"`
+	OwnerApsID       string  `json:"OwnerApsID"`
+	UserID           string  `json:"UserId"`
+	Password         string  `json:"Password"`
+	FunctionType     string  `json:"FunctionType"`
+	PositionType     string  `json:"PositionType"`
+}
+
+func (a *Ape) Scan(value interface{}) error {
+	b, ok := value.([]byte)
+	if !ok {
+		return errors.New("failed to unmarshal Author value")
+	}
+	var config Ape
+	err := json.Unmarshal(b, &config)
+	if err != nil {
+		return err
+	}
+	*a = config
+	return nil
+}
+
+func (a Ape) Value() (driver.Value, error) {
+	return json.Marshal(a)
+}
+
+type RequestApeList struct {
+	APEListObject struct {
+		APEObject []Ape `json:"APEObject"`
+	} `json:"APEListObject"`
+}
+
+type NotificationApeList struct {
+	APEObject []Ape `json:"APEObject"`
+}
diff --git a/vo/constant.go b/vo/constant.go
index a5c4442..c2352ff 100644
--- a/vo/constant.go
+++ b/vo/constant.go
@@ -26,3 +26,28 @@
 	StatusInvalidJsonContent: "JSON鍐呭鏃犳晥",
 	StatusReboot:             "绯荤粺閲嶅惎涓�",
 }
+
+// 1 妗堬紙浜嬶級浠剁洰褰�
+// 2 鍗曚釜妗堬紙浜嬶級浠跺唴瀹�
+// 4 閲囬泦璁惧鐘舵��
+// 5 閲囬泦绯荤粺鐩綍
+// 6 閲囬泦绯荤粺鐘舵��
+// 7 瑙嗛鍗″彛鐩綍
+// 8 鍗曚釜鍗″彛璁板綍
+// 9 杞﹂亾鐩綍
+// 10 鍗曚釜杞﹂亾璁板綍
+// 13 鑷姩閲囬泦鐨勮溅杈嗕俊鎭�
+// 14 鑷姩閲囬泦鐨勯潪鏈哄姩杞﹁締淇℃伅
+// 15 鑷姩閲囬泦鐨勭墿鍝佷俊鎭�
+// 16 鑷姩閲囬泦鐨勬枃浠朵俊鎭�
+const (
+	SubscribeApe    = "3"  //  閲囬泦璁惧鐩綍
+	SubscribePerson = "11" //  鑷姩閲囬泦鐨勪汉鍛樹俊鎭�
+	SubscribeFace   = "12" //  鑷姩閲囬泦鐨勪汉鑴镐俊鎭�
+)
+
+const (
+	Msg_Type_Create_Subscribe int = iota
+	Msg_Type_Update_Subscribe
+	Msg_Type_Delete_Subscribe
+)
diff --git a/vo/face.go b/vo/face.go
index fd10b74..19e31da 100644
--- a/vo/face.go
+++ b/vo/face.go
@@ -95,3 +95,7 @@
 		FaceObject []FaceObject `json:"FaceObject"`
 	} `json:"FaceListObject"`
 }
+
+type NotificationFaceList struct {
+	FaceObject []FaceObject `json:"FaceObject"`
+}
diff --git a/vo/message.go b/vo/message.go
index 6ce5566..bcb4f33 100644
--- a/vo/message.go
+++ b/vo/message.go
@@ -30,3 +30,7 @@
 	LocalTime    string
 	TimeZone     interface{}
 }
+
+type ResponseStatusList struct {
+	ResponseStatusObject []ResponseStatus
+}
diff --git a/vo/subscribe.go b/vo/subscribe.go
new file mode 100644
index 0000000..29c7022
--- /dev/null
+++ b/vo/subscribe.go
@@ -0,0 +1,76 @@
+package vo
+
+import (
+	"database/sql/driver"
+	"encoding/json"
+	"errors"
+)
+
+type Subscribe struct {
+	SubscribeID           string `json:"SubscribeID"`
+	Title                 string `json:"Title"`
+	SubscribeDetail       string `json:"SubscribeDetail"`
+	ResourceURI           string `json:"ResourceURI"`
+	ApplicantName         string `json:"ApplicantName"`
+	ApplicantOrg          string `json:"ApplicantOrg"`
+	BeginTime             string `json:"BeginTime"` // Kept as string for direct compatibility
+	EndTime               string `json:"EndTime"`   // Kept as string for direct compatibility
+	ReceiveAddr           string `json:"ReceiveAddr"`
+	ReportInterval        int    `json:"ReportInterval"`
+	Reason                string `json:"Reason"`
+	OperateType           int    `json:"OperateType"`
+	SubscribeStatus       int    `json:"SubscribeStatus"`
+	SubscribeCancelOrg    string `json:"SubscribeCancelOrg"`
+	SubscribeCancelPerson string `json:"SubscribeCancelPerson"`
+	CancelTime            string `json:"CancelTime"` // Kept as string for direct compatibility
+	CancelReason          string `json:"CancelReason"`
+}
+
+func (s *Subscribe) Scan(value interface{}) error {
+	b, ok := value.([]byte)
+	if !ok {
+		return errors.New("failed to unmarshal Author value")
+	}
+	var config Subscribe
+	err := json.Unmarshal(b, &config)
+	if err != nil {
+		return err
+	}
+	*s = config
+	return nil
+}
+
+func (s Subscribe) Value() (driver.Value, error) {
+	return json.Marshal(s)
+}
+
+type RequestSubscribe struct {
+	SubscribeListObject struct {
+		SubscribeObject []Subscribe `json:"SubscribeObject"`
+	} `json:"SubscribeListObject"`
+}
+
+type RequestSubscribeNotification struct {
+	SubscribeNotificationListObject struct {
+		SubscribeNotificationObject []interface{} `json:"SubscribeNotificationObject"`
+	} `json:"SubscribeNotificationListObject"`
+}
+
+type DeviceNotification struct {
+	NotificationID   string
+	SubscribeID      string
+	Title            string
+	TriggerTime      string
+	InfoIDs          string
+	ExecuteOperation int
+	DeviceList       NotificationApeList
+}
+
+type FaceNotification struct {
+	NotificationID string
+	SubscribeID    string
+	Title          string
+	TriggerTime    string
+	InfoIDs        string
+	FaceObjectList NotificationFaceList
+}

--
Gitblit v1.8.0