zhangzengfei
2024-03-30 7e7a2baab4c5ccb9dd8dc5104041c9efe718ed9a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package cache
 
import (
    "flag"
    "sync"
    "time"
 
    "sdkCompare/cache/shardmap"
    "sdkCompare/db"
 
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
)
 
var querynum = flag.Int("querynum", 3000, "the query number from database")
var threadnum = flag.Int("threadnum", 32, "the number of thread to deal data.")
 
var (
    Unfiled     = "unfiled"
    PRE_DBTABLE = "dbTable_"
)
 
type CmapItem struct {
    sync.Mutex
    Area map[string]*shardmap.ShardMap
}
 
var CacheMap *CmapItem
var doOnce sync.Once
 
func ReInitDbTablePersonsCache() {
    CacheMap.Lock()
    defer CacheMap.Unlock()
    if CacheMap == nil {
        CacheMap = &CmapItem{
            Area: make(map[string]*shardmap.ShardMap),
        }
    }
    for tableId, _ := range CacheMap.Area {
        delete(CacheMap.Area, tableId)
    }
 
    initDbTablePersonsCache()
}
 
func InitDbTablePersons() {
    doOnce.Do(func() {
        flag.Parse()
 
        CacheMap = &CmapItem{
            Area: make(map[string]*shardmap.ShardMap),
        }
 
        // 初始化未分类, 没有小区id的档案
        CacheMap.Area[Unfiled] = shardmap.New(uint8(*threadnum))
    })
 
    initDbTablePersonsCache()
}
 
func initDbTablePersonsCache() {
    // 缓存底库中的全部人员信息
    var dbpApi db.DbPersons
    total, e := dbpApi.GetPersonTotal("")
 
    var psApi db.PersonStatus
    accessAreas, _ := psApi.GetPersonAccessedAreas()
 
    logger.Debugf("所有底库共有%d条记录", total)
    if e == nil && total > 0 {
        queryEachNum := *querynum
        qn := int(total) / *threadnum
        if *querynum < qn {
            queryEachNum = qn
        }
        queryT := int(total) / queryEachNum
        if int(total)%queryEachNum > 0 {
            queryT++
        }
        temptime := time.Now()
        var wg sync.WaitGroup
 
        for i := 0; i < queryT; i++ {
            j := i * queryEachNum
            wg.Add(1)
            go func(qs int) {
                defer wg.Done()
                dbPersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
                if err != nil {
                    logger.Error(err)
                    return
                }
                logger.Debugf("获取%d条人员信息", len(dbPersons))
                CacheMap.Lock()
 
                areaId := ""
                for _, value := range dbPersons {
                    areaId = value.AreaId
                    // 没有小区id的人员
                    if areaId == "" {
                        CacheMap.Area[Unfiled].Set(value.Id, value)
                        CacheMap.Area[Unfiled].Settime()
                        continue
                    }
 
                    for _, areaId := range accessAreas[value.Id] {
                        if _, ok := CacheMap.Area[areaId]; !ok {
                            CacheMap.Area[areaId] = shardmap.New(uint8(*threadnum))
                        }
 
                        CacheMap.Area[areaId].Set(value.Id, value)
                        CacheMap.Area[areaId].Settime()
                    }
                }
 
                CacheMap.Unlock()
 
            }(j)
        }
        wg.Wait()
        logger.Debug("底库人员缓存完成用时:", time.Since(temptime))
    }
}
 
// UpdateDbPersonsCacheById 更新缓存中的全部人员信息
func UpdateDbPersonsCacheById(id string) {
    var dbpApi db.DbPersons
    info, err := dbpApi.GetPersonsCompareCacheById(id)
    if err != nil {
        logger.Error(err)
        return
    }
    if info.AreaId != "" {
        CacheMap.Lock()
        defer CacheMap.Unlock()
        if _, ok := CacheMap.Area[info.AreaId]; !ok {
            CacheMap.Area[info.AreaId] = shardmap.New(uint8(*threadnum))
        }
        CacheMap.Area[info.AreaId].Set(info.Id, info)
        CacheMap.Area[info.AreaId].Settime()
    }
}
 
func RealTimeAddPersonInfoToCache(tableId string, id string, faceFeature string, enable int32, carNo string) {
    CacheMap.Lock()
    defer CacheMap.Unlock()
    if _, ok := CacheMap.Area[tableId]; !ok {
        CacheMap.Area[tableId] = shardmap.New(uint8(*threadnum))
    }
    var ei = protomsg.Esinfo{
        Id:          id,
        Tableid:     tableId,
        FaceFeature: faceFeature,
        Enable:      enable,
        CarNo:       carNo,
    }
    CacheMap.Area[tableId].Set(id, &ei)
    logger.Debug("id:", id, ",tableId:", ",len(faceFeature):", len(faceFeature), ",tableId:", tableId, ",enable:", enable)
}
 
func RealTimeDelPersonFromCache(tableId string, id string) {
    logger.Debug("DelPersonFromCache,tableId:", tableId, ",id:", id)
    CacheMap.Lock()
    defer CacheMap.Unlock()
    if _, ok := CacheMap.Area[tableId]; ok {
        CacheMap.Area[tableId].Del(id)
        logger.Debug("DelPerson ok success")
    } else {
        logger.Error("tableId:", tableId, " not exist")
    }
}
 
func RealTimeDelTable(tableId string) {
    logger.Debug("RealTimeDelTable tableId:", tableId)
    CacheMap.Lock()
    defer CacheMap.Unlock()
 
    if dtM, ok := CacheMap.Area[PRE_DBTABLE]; ok {
        dtM.Del(tableId)
    }
    if _, ok := CacheMap.Area[tableId]; ok {
        delete(CacheMap.Area, tableId)
    }
}
 
// 使底库生效,将底库中的所有生效状态的人特征添加到缓存
func RealTimeUpdateTable(tableId string, enable int32) {
    logger.Debug("RealTimeUpdateTable tableId:", tableId, ",enable:", enable)
    CacheMap.Lock()
    defer CacheMap.Unlock()
 
    if _, ok := CacheMap.Area[PRE_DBTABLE]; !ok {
        CacheMap.Area[PRE_DBTABLE] = shardmap.New(uint8(*threadnum))
    }
    CacheMap.Area[PRE_DBTABLE].Set(tableId, enable == 1)
}
 
func UpdateCache(changeMsg *protomsg.EsPersonCacheChange) {
    if changeMsg.Type == protomsg.EsCacheChanged_T_DbTable {
        if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
            RealTimeUpdateTable(changeMsg.TableId[0], changeMsg.Enable)
        } else if changeMsg.Action == protomsg.DbAction_Delete {
            RealTimeDelTable(changeMsg.TableId[0])
        }
    } else if changeMsg.Type == protomsg.EsCacheChanged_T_DbTablePerson {
        if changeMsg.Action == protomsg.DbAction_Insert || changeMsg.Action == protomsg.DbAction_Update {
            RealTimeAddPersonInfoToCache(changeMsg.TableId[0], changeMsg.PersonId, changeMsg.Feature, changeMsg.Enable, changeMsg.CarNo)
        } else if changeMsg.Action == protomsg.DbAction_Delete {
            RealTimeDelPersonFromCache(changeMsg.TableId[0], changeMsg.PersonId)
        }
    }
}