package models
|
|
|
import (
|
sdb "basic.com/syncdb.git"
|
"dbserver/extend/config"
|
"dbserver/extend/logger"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
var Agent *sdb.Agent
|
var SyncTables = []string{ "area", "camera_area", "cameras", "cluster", "cluster_node", "dictionary", "gb28181_config" }
|
|
var syncSdkCompareCacheChan = make(chan []byte,0)
|
|
func PublishSdkCompareCacheChange(b []byte) {
|
syncSdkCompareCacheChan <- b
|
}
|
func InitAgent() {
|
|
sdb.InitLocalDb(GetDB())
|
|
var lc LocalConfig
|
if lc.Select() != nil || lc.ServerId == "" {
|
return
|
}
|
var clusterE Cluster
|
clusters, err := clusterE.FindAll()
|
if err ==nil && clusters !=nil && len(clusters) >0 {
|
c := clusters[0]
|
var nodeE Node
|
nodes, e := nodeE.FindNodesByClusterId(c.ClusterId)
|
if e == nil && nodes !=nil && len(nodes) >0 {
|
var nodeIps []string
|
for _,n :=range nodes {
|
if n.NodeId != lc.ServerId {
|
nodeIps = append(nodeIps, n.NodeIp)
|
}
|
}
|
Agent,err = sdb.Init(c.ClusterId, c.Password, lc.ServerId, nodeIps)
|
if Agent != nil {
|
logger.Debug("sync.Agent init success!")
|
} else {
|
logger.Debug("sync.Agent init fail!")
|
}
|
|
}
|
}
|
go func() {
|
for {
|
select {
|
case scMsg := <-sdb.SyncDbTablePersonCacheChan:{
|
PublishSdkCompareCacheChange(scMsg)
|
}
|
default:
|
|
}
|
}
|
}()
|
go func() {
|
for {
|
select {
|
case b := <-syncSdkCompareCacheChan:{
|
logger.Debug("SyncSdkCompareCache in,len(b):",len(b))
|
doUpdateCompareCacheRequest(b)
|
}
|
default:
|
|
}
|
}
|
}()
|
}
|
|
func doUpdateCompareCacheRequest(b []byte) {
|
logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg")
|
var sock mangos.Socket
|
var err error
|
if sock,err = req.NewSocket();err !=nil {
|
logger.Debug("comp cache can't new req socket:%s",err.Error())
|
return
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
var url = "tcp://"+config.DbPersonCompare.Url
|
if err = sock.Dial(url);err !=nil {
|
logger.Debug("comp cache can't dial on req socket:%s",err.Error())
|
return
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100)
|
//sock.SetOption(mangos.OptionRecvDeadline, time.Second*10)
|
if err = sock.Send(b);err !=nil {
|
logger.Debug("comp cache can't send message on push socket:%s",err.Error())
|
return
|
}
|
if msg,err := sock.Recv();err !=nil {
|
logger.Debug("comp cache sock.Recv receive err:%s",err.Error())
|
return
|
} else {
|
logger.Debug("comp cache sock.Recv msg:",string(msg))
|
}
|
sock.Close()
|
}
|