package serf import ( "basic.com/pubsub/protomsg.git" sdb "basic.com/syncdb.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/logger.git" "context" "encoding/json" "github.com/gogo/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "strconv" "time" "vamicro/config" "vamicro/system-service/models" "vamicro/system-service/sys" ) var Agent *sdb.Agent var hms *bhomeclient.MicroNode func InitBusH(ms *bhomeclient.MicroNode) { hms = ms } func GetBusHandle() *bhomeclient.MicroNode { return hms } var syncSdkCompareCacheChan = make(chan []byte, 0) func PublishSdkCompareCacheChange(b []byte) { syncSdkCompareCacheChan <- b } func InitAgent(ctx context.Context) { //sdb.InitLocalDb(models.GetDB()) go RawReceiveTcpMsg() if config.Server.AnalyServerId == "" { return } var clusterE models.Cluster clusters, err := clusterE.FindAll() if err == nil && clusters != nil && len(clusters) > 0 { c := clusters[0] var nodeE models.Node nodes, e := nodeE.FindNodesByClusterId(c.ClusterId) if e == nil && nodes != nil && len(nodes) > 0 { var nodeIps []string for idx, n := range nodes { if n.NodeId != config.Server.AnalyServerId { nodeIps = append(nodeIps, n.NodeIp) } else { nodeE = nodes[idx] } } conf := sdb.DefaultConfig() conf.Ctx = ctx localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) if localIp != "" { conf.BindAddr = localIp } Agent, err = sdb.Init(c.ClusterId, c.Password, config.Server.AnalyServerId, nodeIps, config.ClusterSet.SerfSnapShotPath, conf) if Agent != nil { Agent.RegisterHandleEventFunc(HandleSerfEvent) logger.Debugf("local node:", nodeE) if nodeE.DriftState == "slave" { chMsg := protomsg.DbChangeMessage{ Id: nodeE.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, Info: "master2slave", } b, _ := json.Marshal(chMsg) err = hms.Publish(bhomeclient.Proc_System_Service, b) if err != nil { logger.Error(err.Error()) } } logger.Debug("sync.Agent init success!") } else { logger.Debug("sync.Agent init fail!") } } } go func() { for { select { case <-ctx.Done(): return default: HandleUpdateMemberStatus() time.Sleep(5 * time.Second) } } }() go func() { for { select { case <-ctx.Done(): return case scMsg := <-SyncDbTablePersonCacheChan: { PublishSdkCompareCacheChange(scMsg) } default: time.Sleep(500 * time.Millisecond) } } }() go func() { for { select { case <-ctx.Done(): return case b := <-SyncVirtualIpChan: { var msg protomsg.DbChangeMessage if ue := proto.Unmarshal(b, &msg); ue == nil { hms.Publish(bhomeclient.Proc_System_Service, b) } else { logger.Error("unmarshal SyncVirtualIp msg err:", ue) } } default: time.Sleep(50 * time.Millisecond) } } }() go func() { for { select { case <-ctx.Done(): return case b := <-SyncProcMessageChan: { var procMsg ProcMessageEvent err := json.Unmarshal(b, &procMsg) if err != nil { logger.Error("Unmarshal ProcMessageEvent ", err.Error()) } else { err = hms.Publish(procMsg.Topic, procMsg.Payload) if err != nil { logger.Error("hms.Publish error ", err.Error()) } } } default: time.Sleep(50 * time.Millisecond) } } }() go func() { for { select { case <-ctx.Done(): return case b := <-syncSdkCompareCacheChan: { logger.Debug("SyncSdkCompareCache in,len(b):", len(b)) doUpdateCompareCacheRequest(b) } default: time.Sleep(500 * time.Millisecond) } } }() go func() { for { DoSyncRegisterInfo() time.Sleep(10 * time.Second) } }() } func doUpdateCompareCacheRequest(b []byte) { logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg") var sock mangos.Socket var err error if sock, err = req.NewSocket(); err != nil { logger.Debug("comp cache can't new req socket:%s", err.Error()) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port) if err = sock.Dial(url); err != nil { logger.Debug("comp cache can't dial on req socket:%s", err.Error()) return } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) if err = sock.Send(b); err != nil { logger.Debug("comp cache can't send message on push socket:%s", err.Error()) return } if msg, err := sock.Recv(); err != nil { logger.Debug("comp cache sock.Recv receive err:%s", err.Error()) return } else { logger.Debug("comp cache sock.Recv msg:", string(msg)) } sock.Close() } func ReInitDbPersonCompareData() { logger.Debug("加入集群后重新初始化底库比对数据") var sock mangos.Socket var err error if sock, err = req.NewSocket(); err != nil { logger.Debug("ReInitDbPersonCompareData can't new req socket:%s", err.Error()) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) var url = "tcp://" + config.DbPersonCompare.Ip + ":" + strconv.Itoa(config.DbPersonCompare.Port) if err = sock.Dial(url); err != nil { logger.Debug("ReInitDbPersonCompareData can't dial on req socket:%s", err.Error()) return } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024) //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) msg := protomsg.CompareEvent{ EventType: protomsg.CompareEventType_ReInitCache, Payload: []byte(""), } b, err := proto.Marshal(&msg) if err != nil { logger.Debug("ReInitDbPersonCompareData msg marshal err", err) } if err = sock.Send(b); err != nil { logger.Debug("ReInitDbPersonCompareData can't send message on push socket:%s", err.Error()) return } if msg, err := sock.Recv(); err != nil { logger.Debug("ReInitDbPersonCompareData sock.Recv receive err:%s", err.Error()) return } else { logger.Debug("ReInitDbPersonCompareData sock.Recv msg:", string(msg)) } sock.Close() }