package models import ( sdb "basic.com/syncdb.git" "dbserver/extend/config" "dbserver/extend/logger" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) var Agent *sdb.Agent var SyncTables = []string{ "area", "camera_area", "cameras", "cluster", "cluster_node", "dictionary", "gb28181_config" } var syncSdkCompareCacheChan = make(chan []byte,0) func PublishSdkCompareCacheChange(b []byte) { syncSdkCompareCacheChan <- b } func InitAgent() { sdb.InitLocalDb(GetDB()) var lc LocalConfig if lc.Select() != nil || lc.ServerId == "" { return } var clusterE Cluster clusters, err := clusterE.FindAll() if err ==nil && clusters !=nil && len(clusters) >0 { c := clusters[0] var nodeE Node nodes, e := nodeE.FindNodesByClusterId(c.ClusterId) if e == nil && nodes !=nil && len(nodes) >0 { var nodeIps []string for _,n :=range nodes { if n.NodeId != lc.ServerId { nodeIps = append(nodeIps, n.NodeIp) } } Agent,err = sdb.Init(c.ClusterId, c.Password, lc.ServerId, nodeIps) if Agent != nil { logger.Debug("sync.Agent init success!") } else { logger.Debug("sync.Agent init fail!") } } } go func() { for { select { case scMsg := <-sdb.SyncDbTablePersonCacheChan:{ PublishSdkCompareCacheChange(scMsg) } default: } } }() go func() { for { select { case b := <-syncSdkCompareCacheChan:{ logger.Debug("SyncSdkCompareCache in,len(b):",len(b)) doUpdateCompareCacheRequest(b) } default: } } }() } func doUpdateCompareCacheRequest(b []byte) { logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg") var sock mangos.Socket var err error if sock,err = req.NewSocket();err !=nil { logger.Debug("comp cache can't new req socket:%s",err.Error()) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) var url = "tcp://"+config.DbPersonCompare.Url if err = sock.Dial(url);err !=nil { logger.Debug("comp cache can't dial on req socket:%s",err.Error()) return } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) if err = sock.Send(b);err !=nil { logger.Debug("comp cache can't send message on push socket:%s",err.Error()) return } if msg,err := sock.Recv();err !=nil { logger.Debug("comp cache sock.Recv receive err:%s",err.Error()) return } else { logger.Debug("comp cache sock.Recv msg:",string(msg)) } sock.Close() }