package models
|
|
import (
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
"github.com/gogo/protobuf/proto"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
"strconv"
|
"time"
|
"vamicro/config"
|
)
|
|
var syncSdkCompareCacheChan = make(chan []byte, 512)
|
|
func PublishSdkCompareCacheChange(b []byte) {
|
syncSdkCompareCacheChan <- b
|
}
|
|
func InitSync() {
|
go func() {
|
logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg")
|
var sock mangos.Socket
|
var err error
|
if sock, err = req.NewSocket(); err != nil {
|
logger.Debug("comp cache can't new req socket:%s", err.Error())
|
return
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port)
|
logger.Debug("doUpdateCompareCacheRequest url:", url)
|
if err = sock.Dial(url); err != nil {
|
logger.Debug("comp cache can't dial on req socket:%s", err.Error())
|
return
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100)
|
sock.SetOption(mangos.OptionRecvDeadline, time.Second*2)
|
defer sock.Close()
|
|
for {
|
select {
|
case b := <-syncSdkCompareCacheChan:
|
logger.Debug("SyncSdkCompareCache in,len(b):", len(b))
|
// doUpdateCompareCacheRequest(b)
|
if err = sock.Send(b); err != nil {
|
logger.Debug("comp cache can't send message on push socket:%s", err.Error())
|
continue
|
}
|
if msg, err := sock.Recv(); err != nil {
|
logger.Debugf("comp cache sock.Recv receive err:%s\n", err.Error())
|
continue
|
} else {
|
logger.Debug("comp cache sock.Recv msg:", string(msg))
|
}
|
}
|
}
|
}()
|
}
|
|
func SyncSdkCompareCache(analyServerId string, cacheChangeType protomsg.EsCacheChanged, tableId string, personId string, faceFeature string, dbAction protomsg.DbAction, enable int32, carNo string) {
|
msg := protomsg.EsPersonCacheChange{
|
Type: cacheChangeType,
|
TableId: []string{tableId},
|
PersonId: personId,
|
Feature: faceFeature,
|
Action: dbAction,
|
Enable: enable,
|
CarNo: carNo,
|
}
|
b, err := proto.Marshal(&msg)
|
logger.Debug("SyncSdkCompareCache proto.err:", err)
|
if err != nil {
|
return
|
}
|
logger.Debug("SyncSdkCompareCache analyServerId:", analyServerId)
|
|
PublishSdkCompareCacheChange(b)
|
}
|
|
func doUpdateCompareCacheRequest(b []byte) {
|
logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg")
|
var sock mangos.Socket
|
var err error
|
if sock, err = req.NewSocket(); err != nil {
|
logger.Debug("comp cache can't new req socket:%s", err.Error())
|
return
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port)
|
logger.Debug("doUpdateCompareCacheRequest url:", url)
|
if err = sock.Dial(url); err != nil {
|
logger.Debug("comp cache can't dial on req socket:%s", err.Error())
|
return
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100)
|
sock.SetOption(mangos.OptionRecvDeadline, time.Second*2)
|
if err = sock.Send(b); err != nil {
|
logger.Debug("comp cache can't send message on push socket:%s", err.Error())
|
return
|
}
|
if msg, err := sock.Recv(); err != nil {
|
logger.Debug("comp cache sock.Recv receive err:%s", err.Error())
|
return
|
} else {
|
logger.Debug("comp cache sock.Recv msg:", string(msg))
|
}
|
sock.Close()
|
}
|
|
func ReInitDbPersonCompareData() {
|
logger.Debug("加入集群后重新初始化底库比对数据")
|
var sock mangos.Socket
|
var err error
|
if sock, err = req.NewSocket(); err != nil {
|
logger.Debug("ReInitDbPersonCompareData can't new req socket:%s", err.Error())
|
return
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port)
|
if err = sock.Dial(url); err != nil {
|
logger.Debug("ReInitDbPersonCompareData can't dial on req socket:%s", err.Error())
|
return
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024)
|
//sock.SetOption(mangos.OptionRecvDeadline, time.Second*10)
|
msg := protomsg.CompareEvent{
|
EventType: protomsg.CompareEventType_ReInitCache,
|
Payload: []byte(""),
|
}
|
b, err := proto.Marshal(&msg)
|
if err != nil {
|
logger.Debug("ReInitDbPersonCompareData msg marshal err", err)
|
}
|
if err = sock.Send(b); err != nil {
|
logger.Debug("ReInitDbPersonCompareData can't send message on push socket:%s", err.Error())
|
return
|
}
|
if msg, err := sock.Recv(); err != nil {
|
logger.Debug("ReInitDbPersonCompareData sock.Recv receive err:%s", err.Error())
|
return
|
} else {
|
logger.Debug("ReInitDbPersonCompareData sock.Recv msg:", string(msg))
|
}
|
sock.Close()
|
}
|