package models import ( "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "github.com/gogo/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "strconv" "time" "vamicro/config" ) var syncSdkCompareCacheChan = make(chan []byte, 512) func PublishSdkCompareCacheChange(b []byte) { syncSdkCompareCacheChan <- b } func InitSync() { go func() { logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg") var sock mangos.Socket var err error if sock, err = req.NewSocket(); err != nil { logger.Debug("comp cache can't new req socket:%s", err.Error()) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port) logger.Debug("doUpdateCompareCacheRequest url:", url) if err = sock.Dial(url); err != nil { logger.Debug("comp cache can't dial on req socket:%s", err.Error()) return } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) sock.SetOption(mangos.OptionRecvDeadline, time.Second*2) defer sock.Close() for { select { case b := <-syncSdkCompareCacheChan: logger.Debug("SyncSdkCompareCache in,len(b):", len(b)) // doUpdateCompareCacheRequest(b) if err = sock.Send(b); err != nil { logger.Debug("comp cache can't send message on push socket:%s", err.Error()) continue } if msg, err := sock.Recv(); err != nil { logger.Debugf("comp cache sock.Recv receive err:%s\n", err.Error()) continue } else { logger.Debug("comp cache sock.Recv msg:", string(msg)) } } } }() } func SyncSdkCompareCache(analyServerId string, cacheChangeType protomsg.EsCacheChanged, tableId string, personId string, faceFeature string, dbAction protomsg.DbAction, enable int32, carNo string) { msg := protomsg.EsPersonCacheChange{ Type: cacheChangeType, TableId: []string{tableId}, PersonId: personId, Feature: faceFeature, Action: dbAction, Enable: enable, CarNo: carNo, } b, err := proto.Marshal(&msg) logger.Debug("SyncSdkCompareCache proto.err:", err) if err != nil { return } logger.Debug("SyncSdkCompareCache analyServerId:", analyServerId) PublishSdkCompareCacheChange(b) } func doUpdateCompareCacheRequest(b []byte) { logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg") var sock mangos.Socket var err error if sock, err = req.NewSocket(); err != nil { logger.Debug("comp cache can't new req socket:%s", err.Error()) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port) logger.Debug("doUpdateCompareCacheRequest url:", url) if err = sock.Dial(url); err != nil { logger.Debug("comp cache can't dial on req socket:%s", err.Error()) return } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) sock.SetOption(mangos.OptionRecvDeadline, time.Second*2) if err = sock.Send(b); err != nil { logger.Debug("comp cache can't send message on push socket:%s", err.Error()) return } if msg, err := sock.Recv(); err != nil { logger.Debug("comp cache sock.Recv receive err:%s", err.Error()) return } else { logger.Debug("comp cache sock.Recv msg:", string(msg)) } sock.Close() } func ReInitDbPersonCompareData() { logger.Debug("加入集群后重新初始化底库比对数据") var sock mangos.Socket var err error if sock, err = req.NewSocket(); err != nil { logger.Debug("ReInitDbPersonCompareData can't new req socket:%s", err.Error()) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port) if err = sock.Dial(url); err != nil { logger.Debug("ReInitDbPersonCompareData can't dial on req socket:%s", err.Error()) return } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024) //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) msg := protomsg.CompareEvent{ EventType: protomsg.CompareEventType_ReInitCache, Payload: []byte(""), } b, err := proto.Marshal(&msg) if err != nil { logger.Debug("ReInitDbPersonCompareData msg marshal err", err) } if err = sock.Send(b); err != nil { logger.Debug("ReInitDbPersonCompareData can't send message on push socket:%s", err.Error()) return } if msg, err := sock.Recv(); err != nil { logger.Debug("ReInitDbPersonCompareData sock.Recv receive err:%s", err.Error()) return } else { logger.Debug("ReInitDbPersonCompareData sock.Recv msg:", string(msg)) } sock.Close() }