zhangzengfei
2024-09-26 1e180d22fdb13399e9caf31da97a1b5554123102
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package cache
 
import (
    "flag"
    "sync"
    "time"
 
    "sdkCompare/cache/shardmap"
    "sdkCompare/db"
 
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
)
 
var peerSizeNum = flag.Int("peerSize", 3000, "the query number from database")
var threadNum = flag.Int("threadNum", 32, "the number of thread to deal data.")
 
var (
    Unfilled     = "unfilled"
    PreTableName = "dbTable_"
)
 
type AreaMapItem struct {
    sync.Mutex
    Area map[string]*shardmap.ShardMap
}
 
var CaptureDbMap *AreaMapItem
var RealNameDbMap = shardmap.New(uint8(*threadNum))
var KeyPersonDbMap = shardmap.New(uint8(*threadNum))
 
var doOnce sync.Once
 
func InitCache() {
    doOnce.Do(func() {
        flag.Parse()
 
        CaptureDbMap = &AreaMapItem{
            Area: make(map[string]*shardmap.ShardMap),
        }
 
        // 初始化未分类, 没有小区id的档案
        CaptureDbMap.Area[Unfilled] = shardmap.New(uint8(*threadNum))
    })
 
    initDbTablePersonsCache()
    initRealNamePersonsCache()
    initKeyPersonsCache()
}
 
func ReInitDbTablePersonsCache() {
    CaptureDbMap.Lock()
    defer CaptureDbMap.Unlock()
 
    if CaptureDbMap == nil {
        CaptureDbMap = &AreaMapItem{
            Area: make(map[string]*shardmap.ShardMap),
        }
    }
    for tableId, _ := range CaptureDbMap.Area {
        delete(CaptureDbMap.Area, tableId)
    }
 
    initDbTablePersonsCache()
}
func initDbTablePersonsCache() {
    // 缓存底库中的全部人员信息
    var dbpApi db.DbPersons
    total, e := dbpApi.GetPersonTotal()
 
    // 暂时去掉到访小区过滤
    //var psApi db.PersonStatus
    //accessAreas, _ := psApi.GetPersonAccessedAreas()
 
    logger.Debugf("抓拍档案库共有%d条记录", total)
    if e == nil && total > 0 {
        queryEachNum := *peerSizeNum
        qn := int(total) / *threadNum
        if *peerSizeNum < qn {
            queryEachNum = qn
        }
 
        queryT := int(total) / queryEachNum
        if int(total)%queryEachNum > 0 {
            queryT++
        }
 
        startTime := time.Now()
        var wg sync.WaitGroup
 
        for i := 0; i < queryT; i++ {
            j := i * queryEachNum
            wg.Add(1)
            go func(qs int) {
                defer wg.Done()
                dbPersons, err := dbpApi.GetPersonsCacheBase(j, queryEachNum)
                if err != nil {
                    logger.Error(err)
                    return
                }
 
                logger.Debugf("thread:%d, 获取%d条人员信息", queryEachNum, len(dbPersons))
                CaptureDbMap.Lock()
 
                areaId := ""
                for _, value := range dbPersons {
                    areaId = value.AreaId
                    // 没有小区id的人员
                    if areaId == "" {
                        CaptureDbMap.Area[Unfilled].Set(value.Id, value)
                        CaptureDbMap.Area[Unfilled].Settime()
                        continue
                    }
 
                    //for _, areaId := range accessAreas[value.Id] {
                    if _, ok := CaptureDbMap.Area[areaId]; !ok {
                        CaptureDbMap.Area[areaId] = shardmap.New(uint8(*threadNum))
                    }
 
                    CaptureDbMap.Area[areaId].Set(value.Id, value)
                    CaptureDbMap.Area[areaId].Settime()
                    //}
                }
 
                CaptureDbMap.Unlock()
 
            }(j)
        }
        wg.Wait()
 
        logger.Debug("抓拍档案库人员缓存完成用时:", time.Since(startTime))
 
        for k, v := range CaptureDbMap.Area {
            logger.Debugf("Cache area %s items len %d ", k, v.GetLen())
        }
    }
}
 
func initRealNamePersonsCache() {
    var dbApi db.Layouts
    dbPersons, err := dbApi.GetRealNamePersonList()
    if err != nil {
        logger.Error("init real-name persons error,", err.Error())
    }
 
    for _, value := range dbPersons {
        RealNameDbMap.Set(value.Id, value)
    }
 
    logger.Debugf("常住人口共有%d条记录", len(dbPersons))
}
 
func initKeyPersonsCache() {
    var dbApi db.Layouts
    dbPersons, err := dbApi.GetKeyPersonList()
    if err != nil {
        logger.Error("init real-name persons error,", err.Error())
    }
 
    for _, value := range dbPersons {
        KeyPersonDbMap.Set(value.Id, value)
    }
 
    logger.Debugf("重点人员共有%d条记录", len(dbPersons))
}
 
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
func UpdateDbPersonsCacheById(id string) {
    var dbpApi db.DbPersons
    info, err := dbpApi.GetPersonsById(id)
    if err != nil {
        logger.Error(err)
        return
    }
    if info != nil && info.AreaId != "" {
        if _, ok := CaptureDbMap.Area[info.AreaId]; !ok {
            CaptureDbMap.Lock()
            defer CaptureDbMap.Unlock()
            CaptureDbMap.Area[info.AreaId] = shardmap.New(uint8(*threadNum))
        }
        CaptureDbMap.Area[info.AreaId].Set(info.Id, info)
        CaptureDbMap.Area[info.AreaId].Settime()
    }
}
 
func DeleteDbPersonsCacheById(id string) {
    for key, _ := range CaptureDbMap.Area {
        CaptureDbMap.Area[key].Del(id)
    }
}
 
func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) {
    CaptureDbMap.Lock()
    defer CaptureDbMap.Unlock()
    if _, ok := CaptureDbMap.Area[tableId]; !ok {
        CaptureDbMap.Area[tableId] = shardmap.New(uint8(*threadNum))
    }
    var ei = protomsg.Esinfo{
        Id:          id,
        Tableid:     tableId,
        FaceFeature: faceFeature,
        Enable:      enable,
        CarNo:       carNo,
    }
    CaptureDbMap.Area[tableId].Set(id, &ei)
    logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable)
}
 
func RealTimeDelPersonFromCache(tableId string, id string) {
    logger.Debug("Delete person from cache, tableId:", tableId, ",id:", id)
    CaptureDbMap.Lock()
    defer CaptureDbMap.Unlock()
    if _, ok := CaptureDbMap.Area[tableId]; ok {
        CaptureDbMap.Area[tableId].Del(id)
        logger.Debug("DelPerson ok success")
    } else {
        logger.Error("tableId:", tableId, " not exist")
    }
}
 
func RealTimeDelTable(tableId string) {
    logger.Debug("RealTimeDelTable tableId:", tableId)
    CaptureDbMap.Lock()
    defer CaptureDbMap.Unlock()
 
    if dtM, ok := CaptureDbMap.Area[PreTableName]; ok {
        dtM.Del(tableId)
    }
    if _, ok := CaptureDbMap.Area[tableId]; ok {
        delete(CaptureDbMap.Area, tableId)
    }
}
 
// 使底库生效,将底库中的所有生效状态的人特征添加到缓存
func RealTimeUpdateTable(tableId string, enable int32) {
    logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable)
    CaptureDbMap.Lock()
    defer CaptureDbMap.Unlock()
 
    if _, ok := CaptureDbMap.Area[PreTableName]; !ok {
        CaptureDbMap.Area[PreTableName] = shardmap.New(uint8(*threadNum))
    }
    CaptureDbMap.Area[PreTableName].Set(tableId, enable == 1)
}
 
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {
    if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable {
        if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
            RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable)
        } else if changeMsg.Action == protomsg.DbAction_Delete {
            RealTimeDelTable(changeMsg.TableId[0])
        }
    } else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson {
        if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
            RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo)
        } else if changeMsg.Action == protomsg.DbAction_Delete {
            RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId)
        }
    }
}