package serf
|
|
import (
|
"basic.com/pubsub/protomsg.git"
|
sdb "basic.com/syncdb.git"
|
"basic.com/valib/bhomeclient.git"
|
"basic.com/valib/logger.git"
|
"context"
|
"github.com/gogo/protobuf/proto"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
"strconv"
|
"time"
|
"vamicro/config"
|
"vamicro/system-service/models"
|
"vamicro/system-service/sys"
|
)
|
|
var Agent *sdb.Agent
|
var hms *bhomeclient.MicroNode
|
|
func InitBusH(ms *bhomeclient.MicroNode) {
|
hms = ms
|
}
|
|
func GetBusHandle() *bhomeclient.MicroNode {
|
return hms
|
}
|
|
var syncSdkCompareCacheChan = make(chan []byte, 0)
|
|
func PublishSdkCompareCacheChange(b []byte) {
|
syncSdkCompareCacheChan <- b
|
}
|
func InitAgent(ctx context.Context) {
|
|
//sdb.InitLocalDb(models.GetDB())
|
|
go RawReceiveTcpMsg()
|
|
if config.Server.AnalyServerId == "" {
|
return
|
}
|
var clusterE models.Cluster
|
clusters, err := clusterE.FindAll()
|
if err == nil && clusters != nil && len(clusters) > 0 {
|
c := clusters[0]
|
var nodeE models.Node
|
nodes, e := nodeE.FindNodesByClusterId(c.ClusterId)
|
if e == nil && nodes != nil && len(nodes) > 0 {
|
var nodeIps []string
|
for _, n := range nodes {
|
if n.NodeId != config.Server.AnalyServerId {
|
nodeIps = append(nodeIps, n.NodeIp)
|
}
|
}
|
|
conf := sdb.DefaultConfig()
|
conf.Ctx = ctx
|
localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
|
|
if localIp != "" {
|
conf.BindAddr = localIp
|
}
|
|
Agent, err = sdb.Init(c.ClusterId, c.Password, config.Server.AnalyServerId, nodeIps, config.ClusterSet.SerfSnapShotPath, conf)
|
if Agent != nil {
|
Agent.RegisterHandleEventFunc(HandleSerfEvent)
|
logger.Debug("sync.Agent init success!")
|
} else {
|
logger.Debug("sync.Agent init fail!")
|
}
|
}
|
}
|
go func() {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
case scMsg := <-SyncDbTablePersonCacheChan:
|
{
|
PublishSdkCompareCacheChange(scMsg)
|
}
|
default:
|
time.Sleep(500 * time.Millisecond)
|
}
|
}
|
}()
|
go func() {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
case b := <-SyncVirtualIpChan:
|
{
|
var msg protomsg.DbChangeMessage
|
if ue := proto.Unmarshal(b, &msg); ue == nil {
|
hms.Publish(bhomeclient.Proc_System_Service, b)
|
} else {
|
logger.Error("unmarshal SyncVirtualIp msg err:", ue)
|
}
|
}
|
default:
|
time.Sleep(50 * time.Millisecond)
|
}
|
}
|
}()
|
go func() {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
case b := <-syncSdkCompareCacheChan:
|
{
|
logger.Debug("SyncSdkCompareCache in,len(b):", len(b))
|
doUpdateCompareCacheRequest(b)
|
}
|
default:
|
time.Sleep(500 * time.Millisecond)
|
}
|
}
|
}()
|
go func() {
|
for {
|
DoSyncRegisterInfo()
|
time.Sleep(10 * time.Second)
|
}
|
}()
|
}
|
|
func doUpdateCompareCacheRequest(b []byte) {
|
logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg")
|
var sock mangos.Socket
|
var err error
|
if sock, err = req.NewSocket(); err != nil {
|
logger.Debug("comp cache can't new req socket:%s", err.Error())
|
return
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port)
|
if err = sock.Dial(url); err != nil {
|
logger.Debug("comp cache can't dial on req socket:%s", err.Error())
|
return
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100)
|
//sock.SetOption(mangos.OptionRecvDeadline, time.Second*10)
|
if err = sock.Send(b); err != nil {
|
logger.Debug("comp cache can't send message on push socket:%s", err.Error())
|
return
|
}
|
if msg, err := sock.Recv(); err != nil {
|
logger.Debug("comp cache sock.Recv receive err:%s", err.Error())
|
return
|
} else {
|
logger.Debug("comp cache sock.Recv msg:", string(msg))
|
}
|
sock.Close()
|
}
|
|
func ReInitDbPersonCompareData() {
|
logger.Debug("加入集群后重新初始化底库比对数据")
|
var sock mangos.Socket
|
var err error
|
if sock, err = req.NewSocket(); err != nil {
|
logger.Debug("ReInitDbPersonCompareData can't new req socket:%s", err.Error())
|
return
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port)
|
if err = sock.Dial(url); err != nil {
|
logger.Debug("ReInitDbPersonCompareData can't dial on req socket:%s", err.Error())
|
return
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024)
|
//sock.SetOption(mangos.OptionRecvDeadline, time.Second*10)
|
msg := protomsg.CompareEvent{
|
EventType: protomsg.CompareEventType_ReInitCache,
|
Payload: []byte(""),
|
}
|
b, err := proto.Marshal(&msg)
|
if err != nil {
|
logger.Debug("ReInitDbPersonCompareData msg marshal err", err)
|
}
|
if err = sock.Send(b); err != nil {
|
logger.Debug("ReInitDbPersonCompareData can't send message on push socket:%s", err.Error())
|
return
|
}
|
if msg, err := sock.Recv(); err != nil {
|
logger.Debug("ReInitDbPersonCompareData sock.Recv receive err:%s", err.Error())
|
return
|
} else {
|
logger.Debug("ReInitDbPersonCompareData sock.Recv msg:", string(msg))
|
}
|
sock.Close()
|
}
|