From fca319958029fa924308e50cb61202d7d6ff5008 Mon Sep 17 00:00:00 2001
From: zhaoqingang <zhaoqg0118@163.com>
Date: 星期三, 19 二月 2025 13:33:37 +0800
Subject: [PATCH] 暂停聚集模型

---
 models/gather_model.go |  177 +++++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 137 insertions(+), 40 deletions(-)

diff --git a/models/gather_model.go b/models/gather_model.go
index 909c01f..d0c2756 100644
--- a/models/gather_model.go
+++ b/models/gather_model.go
@@ -4,33 +4,45 @@
 	"bytes"
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
-	"github.com/elastic/go-elasticsearch/v6"
 	"log"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/elastic/go-elasticsearch/v6"
+
 	"model-engine/config"
 	"model-engine/db"
+	"model-engine/pkg/logger"
 	"model-engine/pkg/set"
 	"model-engine/service"
-	"strings"
-	"time"
 )
 
 type GatherModel struct {
 	OrgIds         []interface{} `json:"-"`
 	AreaIds        []interface{} `json:"-"`
-	Building       string        `gorm:"type:varchar(255)" json:"building"`    //妤兼爧
-	Floor          string        `gorm:"type:varchar(255)" json:"floor"`       //妤煎眰
-	AlarmType      db.AlarmType  `gorm:"type:varchar(255);" json:"alarmType"`  //棰勮鏂瑰紡
-	PersonType     string        `gorm:"type:varchar(255);" json:"personType"` //浜哄憳绫诲瀷
-	GatherPersons  int           `gorm:"type:int;" json:"gatherPersons"`       //鑱氶泦浜烘暟
-	AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
-	DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         //杩戝嚑澶╁唴
-	Threshold      int           `gorm:"type:int;" json:"threshold" `          //杈惧嚑娆�
+	Building       string        `gorm:"type:varchar(255)" json:"building"`    // 妤兼爧
+	Floor          string        `gorm:"type:varchar(255)" json:"floor"`       // 妤煎眰
+	AlarmType      db.AlarmType  `gorm:"type:varchar(255);" json:"alarmType"`  // 棰勮鏂瑰紡
+	PersonType     string        `gorm:"type:varchar(255);" json:"personType"` // 浜哄憳绫诲瀷
+	GatherPersons  int           `gorm:"type:int;" json:"gatherPersons"`       // 鑱氶泦浜烘暟
+	AppearInterval int           `gorm:"type:int;" json:"appearInterval"`      // 锛屽崟浣嶄负绉�
+	DaysWindow     int           `gorm:"type:int;" json:"daysWindow" `         // 杩戝嚑澶╁唴
+	Threshold      int           `gorm:"type:int;" json:"threshold" `          // 杈惧嚑娆�
 	Task           *db.ModelTask
 }
 
-func (m *GatherModel) Init(task *db.ModelTask) error {
+type ProcessedRecord struct {
+	UniqueKey string    // 鍞竴鏍囪瘑
+	Timestamp time.Time // 璁板綍鐨勬椂闂存埑
+}
 
+func (m *GatherModel) Init(task *db.ModelTask) error {
+	if len(task.DomainUnitIds) == 0 {
+		return errors.New("empty domain set")
+	}
 	orgIds, areaIds, err := service.GetOrgIdsAndAreaIdsByDomainUnitIds(task.DomainUnitIds)
 	if err != nil {
 		return err
@@ -43,11 +55,40 @@
 	m.Floor = task.Floor
 	m.AlarmType = task.AlarmType
 	m.PersonType = task.PersonType
-	m.GatherPersons = task.GatherPersons
-	m.AppearInterval = task.AppearInterval
-	m.DaysWindow = task.DaysWindow
-	m.Threshold = task.Threshold
-	fmt.Println("GatherModel init finish ...")
+
+	for _, v := range task.Rules {
+		if v.Alias == "gatherPersons" {
+			if val, ok := v.Value.(float64); ok {
+				m.GatherPersons = int(val)
+			}
+		}
+
+		if v.Alias == "appearInterval" {
+			if val, ok := v.Value.(float64); ok {
+				m.AppearInterval = int(val)
+			}
+		}
+
+		if v.Alias == "daysWindow" {
+			if val, ok := v.Value.(float64); ok {
+				m.DaysWindow = int(val)
+			}
+		}
+
+		if v.Alias == "threshold" {
+			if val, ok := v.Value.(float64); ok {
+				m.Threshold = int(val)
+			}
+		}
+	}
+
+	logger.Debugf("GatherModel init finish ...task id:%s, name:%s, rule:%+v\n", task.ID, task.Name, m)
+
+	if m.GatherPersons == 0 || m.AppearInterval == 0 || m.DaysWindow == 0 || m.Threshold == 0 {
+		logger.Warnf("invalid parameters. task id:%s, name:%s\n", task.ID, task.Name)
+		return errors.New("invalid parameters")
+	}
+
 	return nil
 }
 
@@ -59,11 +100,24 @@
 	OrgId          string `json:"orgId"`
 	Building       string `json:"building"`
 	Floor          string `json:"floor"`
-	GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  //鑱氶泦浜烘暟
-	AppearInterval int    `gorm:"type:int;" json:"appearInterval"` //鍑虹幇闂撮殧锛屽崟浣嶄负绉�
+	GatherPersons  int    `gorm:"type:int;" json:"gatherPersons"`  // 鑱氶泦浜烘暟
+	AppearInterval int    `gorm:"type:int;" json:"appearInterval"` // 鍑虹幇闂撮殧锛屽崟浣嶄负绉�
 }
 
+var (
+	processed        sync.Map                           // 瀛樺偍宸插鐞嗚褰�
+	cleanupThreshold = time.Now().Add(-100 * time.Hour) // 瀹氫箟涓�涓椂闂寸獥鍙o紝鍋囪鍙繚瀛樻渶杩�100灏忔椂鐨勮褰�
+)
+
 func (m *GatherModel) Run() error {
+	// 娓呯悊杩囨湡鐨勮褰�
+	processed.Range(func(key, value any) bool {
+		if value.(ProcessedRecord).Timestamp.Before(cleanupThreshold) {
+			processed.Delete(key)
+		}
+		return true
+	})
+
 	records, err := queryElasticsearch(db.GetEsClient(), m)
 	if err != nil {
 		log.Fatalf("Failed to query Elasticsearch: %v", err)
@@ -73,7 +127,30 @@
 		return nil
 	}
 
-	aggregation, err := analyzeAndAggregate(records)
+	newRecords := make([]*GatherRecord, 0)
+
+	// 鑱氬悎閫昏緫
+	for _, record := range records {
+		// 鐢熸垚鍞竴鏍囪瘑
+		uniqueKey := fmt.Sprintf("%s-%s", record.DocumentNumber, record.PicDate)
+
+		// 濡傛灉宸茬粡澶勭悊杩囷紝璺宠繃
+		if _, exists := processed.Load(uniqueKey); exists {
+			continue
+		}
+
+		// 娣诲姞鍒板凡澶勭悊璁板綍
+		processed.Store(uniqueKey, ProcessedRecord{
+			UniqueKey: uniqueKey,
+			Timestamp: time.Now(),
+		})
+		newRecords = append(newRecords, record)
+	}
+	if len(newRecords) == 0 {
+		return nil
+	}
+
+	aggregation, err := analyzeAndAggregate(newRecords)
 	if err != nil {
 		log.Fatalf("Failed to analyze and aggregate data: %v", err)
 	}
@@ -89,22 +166,34 @@
 		return err
 	}
 	event := strings.Join(typeNames, ",")
-	for location, persons := range aggregation {
-		result := &db.ModelTaskResults{
-			Title:       m.Task.Name,
-			Event:       fmt.Sprintf("%s/%d浜�", event, len(persons)),
-			ModelID:     m.Task.ModelID,
-			ModelTaskID: m.Task.ID,
-			CommunityId: location.CommunityId,
-			OrgID:       location.OrgId,
-			ObjectIds:   strings.Join(persons.Elements(), ","),
-			Location:    location.Location,
-			Building:    location.Building,
-			Floor:       location.Floor,
+	for lt, persons := range aggregation {
+		if persons.Size() == 0 {
+			continue
 		}
+		personIds := persons.Elements()
+		result := &db.ModelTaskResults{
+			Title:         m.Task.Name,
+			Event:         fmt.Sprintf("%s/%d浜�", event, len(persons)),
+			ModelID:       m.Task.ModelID,
+			ModelTaskID:   m.Task.ID,
+			CommunityId:   lt.CommunityId,
+			OrgID:         lt.OrgId,
+			ObjectIds:     strings.Join(personIds, ","),
+			Location:      lt.Location,
+			Building:      lt.Building,
+			Floor:         lt.Floor,
+			PicDate:       lt.Time,
+			FirstPersonID: personIds[0],
+		}
+
 		results = append(results, result)
 	}
 	return service.SaveTaskResults(results)
+}
+
+func (m *GatherModel) KeepAlive() error {
+	db.GetDB().Model(m.Task).Where("id = ?", m.Task.ID).Update("last_run_time", time.Now())
+	return nil
 }
 
 func (m *GatherModel) Shutdown() error {
@@ -113,7 +202,7 @@
 	return nil
 }
 
-func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]GatherRecord, error) {
+func queryElasticsearch(esClient *elasticsearch.Client, gatherModel *GatherModel) ([]*GatherRecord, error) {
 	var buf bytes.Buffer
 	now := time.Now()
 	start := now.Add(-time.Duration(gatherModel.DaysWindow) * 24 * time.Hour)
@@ -282,7 +371,7 @@
 	}
 
 	// 瑙f瀽鑱氬悎缁撴灉
-	var records []GatherRecord
+	var records []*GatherRecord
 	if aggs, ok := result["aggregations"].(map[string]interface{}); ok {
 		if orgBuckets, ok := aggs["orgs"].(map[string]interface{})["buckets"].([]interface{}); ok {
 			for _, orgBucket := range orgBuckets {
@@ -307,7 +396,7 @@
 										if gatherEvents, ok := floorBucket.(map[string]interface{})["gather_events"].(map[string]interface{})["buckets"].([]interface{}); ok {
 											for _, eventBucket := range gatherEvents {
 												key := int64(eventBucket.(map[string]interface{})["key"].(float64)) / 1000 // 灏嗘绉掕浆鎹负绉�
-												timestamp := time.Unix(key, 0).Format("2006-01-02 15:04:05")
+												timestamp := time.Unix(key, 0).UTC().Format("2006-01-02 15:04:05")
 
 												// 瑙f瀽浜哄憳
 												if peopleBuckets, ok := eventBucket.(map[string]interface{})["people"].(map[string]interface{})["buckets"].([]interface{}); ok {
@@ -315,7 +404,7 @@
 														documentNumber := person.(map[string]interface{})["key"].(string)
 
 														// 鏋勫缓 GatherRecord 缁撴瀯浣�
-														record := GatherRecord{
+														record := &GatherRecord{
 															PicDate:        timestamp,
 															DocumentNumber: documentNumber,
 															CommunityId:    communityId,
@@ -344,16 +433,20 @@
 	return records, nil
 }
 
-type GatherLocation struct {
+type GatherLocationTime struct {
 	CommunityId string
 	OrgId       string
 	Building    string
 	Floor       string
 	Location    string
+	Time        string
 }
 
-func analyzeAndAggregate(records []GatherRecord) (map[GatherLocation]set.StringSet, error) {
-	aggregation := make(map[GatherLocation]set.StringSet)
+func analyzeAndAggregate(records []*GatherRecord) (map[GatherLocationTime]set.StringSet, error) {
+	if len(records) == 0 {
+		return nil, nil
+	}
+	aggregation := make(map[GatherLocationTime]set.StringSet)
 	domainIds := set.NewStringSet()
 	for _, record := range records {
 		domainIds.Add(record.CommunityId)
@@ -365,16 +458,20 @@
 	}
 
 	for _, record := range records {
+		if record.DocumentNumber == "" {
+			continue
+		}
 		if domains[record.CommunityId] == nil {
 			continue
 		}
 
-		location := GatherLocation{
+		location := GatherLocationTime{
 			CommunityId: record.CommunityId,
 			OrgId:       record.OrgId,
 			Building:    record.Building,
 			Floor:       record.Floor,
 			Location:    fmt.Sprintf("%s%s%s", domains[record.CommunityId].Name, record.Building, record.Floor),
+			Time:        record.PicDate,
 		}
 		if aggregation[location] == nil {
 			aggregation[location] = set.NewStringSet()

--
Gitblit v1.8.0