package serf import ( "basic.com/valib/bhomeclient.git" "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg" "basic.com/valib/logger.git" "basic.com/valib/serf.git/serf" "encoding/json" "errors" "github.com/golang/protobuf/proto" "reflect" "strconv" "sync" "time" "vamicro/config" "vamicro/extend/util" "vamicro/system-service/bhome_msg_dev" ) type RequestSerfTopicMsg struct { Topic string `json:"topic"` Msg []byte `json:"msg"` targetNodeIds []string `json:"targetNodeIds"` timeout time.Duration } type RequestSerfTopicResp struct { DevId string `json:"devId"` DevIp string `json:"devIp"` Topic string `json:"topic"` Procs []TopicProc `json:"procs"` } type TopicProc struct { ProcId string `json:"procId"` Addr bhome_msg.BHAddress `json:"addr"` } //通过topic获取指定topic的所有节点,供bus使用 func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) { req := RequestSerfTopicMsg{ Topic: QueryNodesByTopic, timeout: time.Second * 10, } err, data := QuerySerfNodes(req) if err != nil { return nil, err } var result []RequestSerfTopicResp if len(data) > 0 { for _, d := range data { var arr []RequestSerfTopicResp if err = json.Unmarshal(d, &arr); err != nil { logger.Debug("json Unmarshal err:", err) } else { result = append(result, arr...) } } } return result, nil } //响应根据topic获取地址 func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) { clients := hms.GetLocalNetNodeByTopic(topic) if len(clients) == 0 { return nil, errors.New("topic not exist") } else { ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) var destArr []TopicProc for idx, c := range clients { tp := TopicProc{ ProcId: string(c.ProcId), Addr: *(clients[idx].Addr), } //if c. != nil { // for _,p := range cli.Service.TopicList { // if topic == string(p) { // tps = append(tps, TopicProc{ // ProcId: string(cli.Proc.ProcId), // ProcName: string(cli.Proc.Name), // }) // } // } //} destArr = append(destArr, tp) } r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{ Topic: topic, DevId: config.Server.AnalyServerId, DevIp: ipv4, Procs: destArr, }) return r, nil } } //代理请求,通过当前的serf发送给其他的serf节点 func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) { resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{ FilterNodes: req.targetNodeIds, }) if err != nil { return err, nil } var data [][]byte var retE error after := time.After(req.timeout) loop: for { select { case r := <-resp.ResponseCh(): data = append(data, r.Payload) case <-after: //retE = errors.New("time out") break loop } } return retE, data } //serf接收到query消息后需要访问本地进程获得结果并反馈 func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) { br := bhomeclient.Request{ Path: req.Topic, Body: req.Msg, } reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000) if err != nil { return err, nil } return nil, reply } var ( riPool map[string]bhome_msg_dev.MsgDevRegisterInfo topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center" riLock sync.Mutex ) func init() { riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo) } //从serf中获取注册中心的注册信息是否变化,并将集群中所有注册信息汇总 func DoSyncRegisterInfo() error { if hms == nil || Agent == nil { return errors.New("ms or Agent handle is nil") } infos, err := hms.GetRegisteredClient() if err != nil { return err } ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) ri := bhome_msg_dev.MsgDevRegisterInfo{ DevId: []byte(config.Server.AnalyServerId), Ip: []byte(ipv4), ProxyPort: 4020, } for _, i := range infos { if i.Online { mta := bhome_msg_dev.MsgProcTopicsAll{ Online: i.Online, } if i.Proc != nil { mta.Proc = &bhome_msg_dev.ProcInfo{ ProcId: i.Proc.ProcId, Name: i.Proc.Name, PublicInfo: i.Proc.PublicInfo, PrivateInfo: i.Proc.PrivateInfo, } //proc的端口信息放在PublicInfo中 if string(mta.Proc.PublicInfo) != "" { if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 { mta.Port = int32(iPort) } } } if i.Topics != nil { mta.TopicList = &bhome_msg_dev.MsgTopicList{ TopicList: i.Topics.TopicList, } } if i.SubLocalTopics != nil { mta.LocalSublist = &bhome_msg_dev.MsgTopicList{ TopicList: i.SubLocalTopics.TopicList, } } if i.SubNetTopics != nil { mta.NetSublist = &bhome_msg_dev.MsgTopicList{ TopicList: i.SubNetTopics.TopicList, } } ri.ProcInfos = append(ri.ProcInfos, &mta) } } riLock.Lock() bs := false //判断是否更改,如果发生更改则需要重新发布到center中 if v, ok := riPool[string(ri.DevId)]; !ok { bs = true riPool[string(ri.DevId)] = ri } else { if !reflect.DeepEqual(v, ri) { bs = true riPool[string(ri.DevId)] = ri } } riLock.Unlock() if bs { bts, err := proto.Marshal(&ri) if err != nil { logger.Debug("marshal riPool err:", err) return err } logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts)) Agent.UserEvent(UserEventSyncRegisterInfo, bts, false) } return nil } func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) { logger.Debug("Enter compareRPool") riLock.Lock() riPool[string(ri.DevId)] = *ri riLock.Unlock() b, _ := proto.Marshal(ri) logger.Debug("before hms.Publish to topicPubAllRegisterInfo") err := hms.Publish(topicPubAllRegisterInfo, b) if err != nil { logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err) } else { logger.Debug("hms.Publish success") } } func SyncProxy(topic string, body []byte) error { return Agent.UserEvent(topic, body, false) }