| | |
| | | package serf |
| | | |
| | | import ( |
| | | "basic.com/valib/bhomeclient.git" |
| | | "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg" |
| | | "basic.com/valib/logger.git" |
| | | "basic.com/valib/serf.git/serf" |
| | | "encoding/json" |
| | | "errors" |
| | | "github.com/golang/protobuf/proto" |
| | | "reflect" |
| | | "strconv" |
| | | "sync" |
| | | "time" |
| | | "vamicro/config" |
| | | "vamicro/extend/util" |
| | | "vamicro/system-service/bhome_msg_dev" |
| | | ) |
| | | |
| | | type RequestSerfTopicMsg struct { |
| | | Topic string `json:"topic"` |
| | | Msg []byte `json:"msg"` |
| | | targetNodeIds []string `json:"targetNodeIds"` |
| | | timeout time.Duration |
| | | } |
| | | |
| | | type RequestSerfTopicResp struct { |
| | | DevId string `json:"devId"` |
| | | DevIp string `json:"devIp"` |
| | | Topic string `json:"topic"` |
| | | Procs []TopicProc `json:"procs"` |
| | | } |
| | | |
| | | type TopicProc struct { |
| | | ProcId string `json:"procId"` |
| | | Addr bhome_msg.BHAddress `json:"addr"` |
| | | } |
| | | |
| | | //通过topic获取指定topic的所有节点,供bus使用 |
| | | func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) { |
| | | req := RequestSerfTopicMsg{ |
| | | Topic: QueryNodesByTopic, |
| | | timeout: time.Second * 10, |
| | | } |
| | | err, data := QuerySerfNodes(req) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var result []RequestSerfTopicResp |
| | | if len(data) > 0 { |
| | | for _, d := range data { |
| | | var arr []RequestSerfTopicResp |
| | | if err = json.Unmarshal(d, &arr); err != nil { |
| | | logger.Debug("json Unmarshal err:", err) |
| | | } else { |
| | | result = append(result, arr...) |
| | | } |
| | | } |
| | | } |
| | | |
| | | return result, nil |
| | | } |
| | | |
| | | //响应根据topic获取地址 |
| | | func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) { |
| | | clients := hms.GetLocalNetNodeByTopic(topic) |
| | | if len(clients) == 0 { |
| | | return nil, errors.New("topic not exist") |
| | | } else { |
| | | ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | var destArr []TopicProc |
| | | for idx, c := range clients { |
| | | tp := TopicProc{ |
| | | ProcId: string(c.ProcId), |
| | | Addr: *(clients[idx].Addr), |
| | | } |
| | | //if c. != nil { |
| | | // for _,p := range cli.Service.TopicList { |
| | | // if topic == string(p) { |
| | | // tps = append(tps, TopicProc{ |
| | | // ProcId: string(cli.Proc.ProcId), |
| | | // ProcName: string(cli.Proc.Name), |
| | | // }) |
| | | // } |
| | | // } |
| | | //} |
| | | destArr = append(destArr, tp) |
| | | } |
| | | r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{ |
| | | Topic: topic, |
| | | DevId: config.Server.AnalyServerId, |
| | | DevIp: ipv4, |
| | | Procs: destArr, |
| | | }) |
| | | return r, nil |
| | | } |
| | | } |
| | | |
| | | //代理请求,通过当前的serf发送给其他的serf节点 |
| | | func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) { |
| | | resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{ |
| | | FilterNodes: req.targetNodeIds, |
| | | }) |
| | | if err != nil { |
| | | return err, nil |
| | | } |
| | | |
| | | var data [][]byte |
| | | var retE error |
| | | after := time.After(req.timeout) |
| | | loop: |
| | | for { |
| | | select { |
| | | case r := <-resp.ResponseCh(): |
| | | data = append(data, r.Payload) |
| | | case <-after: |
| | | //retE = errors.New("time out") |
| | | break loop |
| | | } |
| | | } |
| | | |
| | | return retE, data |
| | | } |
| | | |
| | | //serf接收到query消息后需要访问本地进程获得结果并反馈 |
| | | func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) { |
| | | br := bhomeclient.Request{ |
| | | Path: req.Topic, |
| | | Body: req.Msg, |
| | | } |
| | | reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000) |
| | | if err != nil { |
| | | return err, nil |
| | | } |
| | | return nil, reply |
| | | } |
| | | |
| | | var ( |
| | | riPool map[string]bhome_msg_dev.MsgDevRegisterInfo |
| | | topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center" |
| | | riLock sync.Mutex |
| | | ) |
| | | |
| | | func init() { |
| | | riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo) |
| | | } |
| | | |
| | | //从serf中获取注册中心的注册信息是否变化,并将集群中所有注册信息汇总 |
| | | func DoSyncRegisterInfo() error { |
| | | if hms == nil || Agent == nil { |
| | | return errors.New("ms or Agent handle is nil") |
| | | } |
| | | infos, err := hms.GetRegisteredClient() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | ri := bhome_msg_dev.MsgDevRegisterInfo{ |
| | | DevId: []byte(config.Server.AnalyServerId), |
| | | Ip: []byte(ipv4), |
| | | ProxyPort: 4020, |
| | | } |
| | | for _, i := range infos { |
| | | if i.Online { |
| | | mta := bhome_msg_dev.MsgProcTopicsAll{ |
| | | Online: i.Online, |
| | | } |
| | | if i.Proc != nil { |
| | | mta.Proc = &bhome_msg_dev.ProcInfo{ |
| | | ProcId: i.Proc.ProcId, |
| | | Name: i.Proc.Name, |
| | | PublicInfo: i.Proc.PublicInfo, |
| | | PrivateInfo: i.Proc.PrivateInfo, |
| | | } |
| | | //proc的端口信息放在PublicInfo中 |
| | | if string(mta.Proc.PublicInfo) != "" { |
| | | if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 { |
| | | mta.Port = int32(iPort) |
| | | } |
| | | } |
| | | } |
| | | if i.Topics != nil { |
| | | mta.TopicList = &bhome_msg_dev.MsgTopicList{ |
| | | TopicList: i.Topics.TopicList, |
| | | } |
| | | } |
| | | if i.SubLocalTopics != nil { |
| | | mta.LocalSublist = &bhome_msg_dev.MsgTopicList{ |
| | | TopicList: i.SubLocalTopics.TopicList, |
| | | } |
| | | } |
| | | if i.SubNetTopics != nil { |
| | | mta.NetSublist = &bhome_msg_dev.MsgTopicList{ |
| | | TopicList: i.SubNetTopics.TopicList, |
| | | } |
| | | } |
| | | ri.ProcInfos = append(ri.ProcInfos, &mta) |
| | | } |
| | | } |
| | | |
| | | riLock.Lock() |
| | | bs := false //判断是否更改,如果发生更改则需要重新发布到center中 |
| | | if v, ok := riPool[string(ri.DevId)]; !ok { |
| | | bs = true |
| | | riPool[string(ri.DevId)] = ri |
| | | } else { |
| | | if !reflect.DeepEqual(v, ri) { |
| | | bs = true |
| | | riPool[string(ri.DevId)] = ri |
| | | } |
| | | } |
| | | riLock.Unlock() |
| | | |
| | | if bs { |
| | | bts, err := proto.Marshal(&ri) |
| | | if err != nil { |
| | | logger.Debug("marshal riPool err:", err) |
| | | return err |
| | | } |
| | | |
| | | logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts)) |
| | | Agent.UserEvent(UserEventSyncRegisterInfo, bts, false) |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) { |
| | | logger.Debug("Enter compareRPool") |
| | | riLock.Lock() |
| | | riPool[string(ri.DevId)] = *ri |
| | | riLock.Unlock() |
| | | |
| | | b, _ := proto.Marshal(ri) |
| | | |
| | | logger.Debug("before hms.Publish to topicPubAllRegisterInfo") |
| | | err := hms.Publish(topicPubAllRegisterInfo, b) |
| | | if err != nil { |
| | | logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err) |
| | | } else { |
| | | logger.Debug("hms.Publish success") |
| | | } |
| | | } |
| | | |
| | | func SyncProxy(topic string, body []byte) error { |
| | | return Agent.UserEvent(topic, body, false) |
| | | } |
| | | package serf
|
| | |
|
| | | import (
|
| | | "basic.com/valib/bhomeclient.git"
|
| | | "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
|
| | | "basic.com/valib/logger.git"
|
| | | "basic.com/valib/serf.git/serf"
|
| | | "encoding/json"
|
| | | "errors"
|
| | | "github.com/golang/protobuf/proto"
|
| | | "reflect"
|
| | | "strconv"
|
| | | "sync"
|
| | | "time"
|
| | | "vamicro/config"
|
| | | "vamicro/extend/util"
|
| | | "vamicro/system-service/bhome_msg_dev"
|
| | | )
|
| | |
|
| | | type RequestSerfTopicMsg struct {
|
| | | Topic string `json:"topic"`
|
| | | Msg []byte `json:"msg"`
|
| | | targetNodeIds []string `json:"targetNodeIds"`
|
| | | timeout time.Duration
|
| | | }
|
| | |
|
| | | type RequestSerfTopicResp struct {
|
| | | DevId string `json:"devId"`
|
| | | DevIp string `json:"devIp"`
|
| | | Topic string `json:"topic"`
|
| | | Procs []TopicProc `json:"procs"`
|
| | | }
|
| | |
|
| | | type TopicProc struct {
|
| | | ProcId string `json:"procId"`
|
| | | Addr bhome_msg.BHAddress `json:"addr"`
|
| | | }
|
| | |
|
| | | //通过topic获取指定topic的所有节点,供bus使用
|
| | | func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) {
|
| | | req := RequestSerfTopicMsg{
|
| | | Topic: QueryNodesByTopic,
|
| | | timeout: time.Second * 10,
|
| | | }
|
| | | err, data := QuerySerfNodes(req)
|
| | | if err != nil {
|
| | | return nil, err
|
| | | }
|
| | | var result []RequestSerfTopicResp
|
| | | if len(data) > 0 {
|
| | | for _, d := range data {
|
| | | var arr []RequestSerfTopicResp
|
| | | if err = json.Unmarshal(d, &arr); err != nil {
|
| | | logger.Debug("json Unmarshal err:", err)
|
| | | } else {
|
| | | result = append(result, arr...)
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | return result, nil
|
| | | }
|
| | |
|
| | | //响应根据topic获取地址
|
| | | func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) {
|
| | | clients := hms.GetLocalNetNodeByTopic(topic)
|
| | | if len(clients) == 0 {
|
| | | return nil, errors.New("topic not exist")
|
| | | } else {
|
| | | ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
|
| | | var destArr []TopicProc
|
| | | for idx, c := range clients {
|
| | | tp := TopicProc{
|
| | | ProcId: string(c.ProcId),
|
| | | Addr: *(clients[idx].Addr),
|
| | | }
|
| | | //if c. != nil {
|
| | | // for _,p := range cli.Service.TopicList {
|
| | | // if topic == string(p) {
|
| | | // tps = append(tps, TopicProc{
|
| | | // ProcId: string(cli.Proc.ProcId),
|
| | | // ProcName: string(cli.Proc.Name),
|
| | | // })
|
| | | // }
|
| | | // }
|
| | | //}
|
| | | destArr = append(destArr, tp)
|
| | | }
|
| | | r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{
|
| | | Topic: topic,
|
| | | DevId: config.Server.AnalyServerId,
|
| | | DevIp: ipv4,
|
| | | Procs: destArr,
|
| | | })
|
| | | return r, nil
|
| | | }
|
| | | }
|
| | |
|
| | | //代理请求,通过当前的serf发送给其他的serf节点
|
| | | func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) {
|
| | | resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{
|
| | | FilterNodes: req.targetNodeIds,
|
| | | })
|
| | | if err != nil {
|
| | | return err, nil
|
| | | }
|
| | |
|
| | | var data [][]byte
|
| | | var retE error
|
| | | after := time.After(req.timeout)
|
| | | loop:
|
| | | for {
|
| | | select {
|
| | | case r := <-resp.ResponseCh():
|
| | | data = append(data, r.Payload)
|
| | | case <-after:
|
| | | //retE = errors.New("time out")
|
| | | break loop
|
| | | }
|
| | | }
|
| | |
|
| | | return retE, data
|
| | | }
|
| | |
|
| | | //serf接收到query消息后需要访问本地进程获得结果并反馈
|
| | | func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) {
|
| | | br := bhomeclient.Request{
|
| | | Path: req.Topic,
|
| | | Body: req.Msg,
|
| | | }
|
| | | reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000)
|
| | | if err != nil {
|
| | | return err, nil
|
| | | }
|
| | | return nil, reply
|
| | | }
|
| | |
|
| | | var (
|
| | | riPool map[string]bhome_msg_dev.MsgDevRegisterInfo
|
| | | topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center"
|
| | | riLock sync.Mutex
|
| | | )
|
| | |
|
| | | func init() {
|
| | | riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo)
|
| | | }
|
| | |
|
| | | //从serf中获取注册中心的注册信息是否变化,并将集群中所有注册信息汇总
|
| | | func DoSyncRegisterInfo() error {
|
| | | if hms == nil || Agent == nil {
|
| | | return errors.New("ms or Agent handle is nil")
|
| | | }
|
| | | infos, err := hms.GetRegisteredClient()
|
| | | if err != nil {
|
| | | return err
|
| | | }
|
| | | ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
|
| | | ri := bhome_msg_dev.MsgDevRegisterInfo{
|
| | | DevId: []byte(config.Server.AnalyServerId),
|
| | | Ip: []byte(ipv4),
|
| | | ProxyPort: 4020,
|
| | | }
|
| | | for _, i := range infos {
|
| | | if i.Online {
|
| | | mta := bhome_msg_dev.MsgProcTopicsAll{
|
| | | Online: i.Online,
|
| | | }
|
| | | if i.Proc != nil {
|
| | | mta.Proc = &bhome_msg_dev.ProcInfo{
|
| | | ProcId: i.Proc.ProcId,
|
| | | Name: i.Proc.Name,
|
| | | PublicInfo: i.Proc.PublicInfo,
|
| | | PrivateInfo: i.Proc.PrivateInfo,
|
| | | }
|
| | | //proc的端口信息放在PublicInfo中
|
| | | if string(mta.Proc.PublicInfo) != "" {
|
| | | if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 {
|
| | | mta.Port = int32(iPort)
|
| | | }
|
| | | }
|
| | | }
|
| | | if i.Topics != nil {
|
| | | mta.TopicList = &bhome_msg_dev.MsgTopicList{
|
| | | TopicList: i.Topics.TopicList,
|
| | | }
|
| | | }
|
| | | if i.SubLocalTopics != nil {
|
| | | mta.LocalSublist = &bhome_msg_dev.MsgTopicList{
|
| | | TopicList: i.SubLocalTopics.TopicList,
|
| | | }
|
| | | }
|
| | | if i.SubNetTopics != nil {
|
| | | mta.NetSublist = &bhome_msg_dev.MsgTopicList{
|
| | | TopicList: i.SubNetTopics.TopicList,
|
| | | }
|
| | | }
|
| | | ri.ProcInfos = append(ri.ProcInfos, &mta)
|
| | | }
|
| | | }
|
| | |
|
| | | riLock.Lock()
|
| | | bs := false //判断是否更改,如果发生更改则需要重新发布到center中
|
| | | if v, ok := riPool[string(ri.DevId)]; !ok {
|
| | | bs = true
|
| | | riPool[string(ri.DevId)] = ri
|
| | | } else {
|
| | | if !reflect.DeepEqual(v, ri) {
|
| | | bs = true
|
| | | riPool[string(ri.DevId)] = ri
|
| | | }
|
| | | }
|
| | | riLock.Unlock()
|
| | |
|
| | | if bs {
|
| | | bts, err := proto.Marshal(&ri)
|
| | | if err != nil {
|
| | | logger.Debug("marshal riPool err:", err)
|
| | | return err
|
| | | }
|
| | |
|
| | | logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts))
|
| | | Agent.UserEvent(UserEventSyncRegisterInfo, bts, false)
|
| | | }
|
| | | return nil
|
| | | }
|
| | |
|
| | | func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) {
|
| | | logger.Debug("Enter compareRPool")
|
| | | riLock.Lock()
|
| | | riPool[string(ri.DevId)] = *ri
|
| | | riLock.Unlock()
|
| | |
|
| | | b, _ := proto.Marshal(ri)
|
| | |
|
| | | logger.Debug("before hms.Publish to topicPubAllRegisterInfo")
|
| | | err := hms.Publish(topicPubAllRegisterInfo, b)
|
| | | if err != nil {
|
| | | logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err)
|
| | | } else {
|
| | | logger.Debug("hms.Publish success")
|
| | | }
|
| | | }
|
| | |
|
| | | func SyncProxy(topic string, body []byte) error {
|
| | | return Agent.UserEvent(topic, body, false)
|
| | | }
|