zhangzengfei
2024-04-07 de14a5297d1103c7522e5473131e04e0a403c22c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
 
import (
    "context"
    "flag"
    "path"
    "strconv"
    "time"
 
    "sdkCompare/cache"
    "sdkCompare/compare"
    "sdkCompare/config"
    "sdkCompare/db"
    "sdkCompare/proto/facecompare"
 
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
    "basic.com/valib/version.git"
    "github.com/golang/protobuf/proto"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
 
const procName = "faceCompare"
 
func init() {
    flag.Parse()
    vaversion.Usage()
}
 
func main() {
    err := config.Init()
    if err != nil {
        return
    }
 
    var logFile = path.Join(config.LogConf.Path, "faceCompare.log")
 
    // 日志初始化
    logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
    logger.Info("logger init success !")
 
    serveUrl := "tcp://0.0.0.0:"
    if err := db.ConnectDB(); err != nil {
        logger.Error(err.Error())
        return
    }
 
    cache.InitDbTablePersons()
    serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
 
    logger.Infof("%s serve url:%s", procName, serveUrl)
 
    Recv(serveUrl)
}
 
func Recv(url string) {
    var sock mangos.Socket
    var err error
    var msg []byte
    var ctx, _ = context.WithCancel(context.Background())
    if sock, err = rep.NewSocket(); err != nil {
        logger.Error("new rep socket err:", err)
    }
 
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        logger.Error("listen on rep socket err:", err)
    }
 
    for {
        select {
        case <-ctx.Done():
            logger.Info("ctx done")
            return
        default:
            msg, err = sock.Recv()
            if err != nil || len(msg) <= 0 {
                continue
            }
 
            var request facecompare.CompareRequest
            err = proto.Unmarshal(msg, &request)
            if err != nil {
                logger.Warn("CompareRequest json unmarshal error")
                continue
            }
 
            var result []byte
            if request.CompareType == facecompare.CompareType_Compare {
                var compareArgInfo protomsg.CompareArgs
                var cacheChangeInfo protomsg.EsPersonCacheChange
                if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
                    timeStart := time.Now()
                    result = compare.GetComparePersonBaseInfo(compareArgInfo)
                    logger.Debug("用时:", time.Since(timeStart))
                } else if err = proto.Unmarshal(request.Payload, &cacheChangeInfo); err == nil {
                    cache.UpdateCache(&cacheChangeInfo)
                } else {
                    logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
                    continue
                }
            } else if request.CompareType == facecompare.CompareType_UpdateCache {
                var compareEvent protomsg.CompareEvent
                if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
                    if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                        cache.ReInitDbTablePersonsCache()
                    } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                        id := string(compareEvent.Payload)
                        cache.UpdateDbPersonsCacheById(id)
                        logger.Info("--------------更新人员缓存, id: ", id)
                    }
                } else {
                    logger.Warn("CompareEvent json unmarshal error")
                    continue
                }
            }
 
            err = sock.Send(result)
            if err != nil {
                logger.Warn("send reply err:", err.Error())
            }
        }
    }
}