zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/serf/proxy.go
@@ -1,246 +1,246 @@
package serf
import (
   "basic.com/valib/bhomeclient.git"
   "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "errors"
   "github.com/golang/protobuf/proto"
   "reflect"
   "strconv"
   "sync"
   "time"
   "vamicro/config"
   "vamicro/extend/util"
   "vamicro/system-service/bhome_msg_dev"
)
type RequestSerfTopicMsg struct {
   Topic         string   `json:"topic"`
   Msg           []byte   `json:"msg"`
   targetNodeIds []string `json:"targetNodeIds"`
   timeout       time.Duration
}
type RequestSerfTopicResp struct {
   DevId string      `json:"devId"`
   DevIp string      `json:"devIp"`
   Topic string      `json:"topic"`
   Procs []TopicProc `json:"procs"`
}
type TopicProc struct {
   ProcId string              `json:"procId"`
   Addr   bhome_msg.BHAddress `json:"addr"`
}
//通过topic获取指定topic的所有节点,供bus使用
func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) {
   req := RequestSerfTopicMsg{
      Topic:   QueryNodesByTopic,
      timeout: time.Second * 10,
   }
   err, data := QuerySerfNodes(req)
   if err != nil {
      return nil, err
   }
   var result []RequestSerfTopicResp
   if len(data) > 0 {
      for _, d := range data {
         var arr []RequestSerfTopicResp
         if err = json.Unmarshal(d, &arr); err != nil {
            logger.Debug("json Unmarshal err:", err)
         } else {
            result = append(result, arr...)
         }
      }
   }
   return result, nil
}
//响应根据topic获取地址
func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) {
   clients := hms.GetLocalNetNodeByTopic(topic)
   if len(clients) == 0 {
      return nil, errors.New("topic not exist")
   } else {
      ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
      var destArr []TopicProc
      for idx, c := range clients {
         tp := TopicProc{
            ProcId: string(c.ProcId),
            Addr:   *(clients[idx].Addr),
         }
         //if c. != nil {
         //   for _,p := range cli.Service.TopicList {
         //      if topic == string(p) {
         //         tps = append(tps, TopicProc{
         //            ProcId: string(cli.Proc.ProcId),
         //            ProcName: string(cli.Proc.Name),
         //         })
         //      }
         //   }
         //}
         destArr = append(destArr, tp)
      }
      r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{
         Topic: topic,
         DevId: config.Server.AnalyServerId,
         DevIp: ipv4,
         Procs: destArr,
      })
      return r, nil
   }
}
//代理请求,通过当前的serf发送给其他的serf节点
func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) {
   resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{
      FilterNodes: req.targetNodeIds,
   })
   if err != nil {
      return err, nil
   }
   var data [][]byte
   var retE error
   after := time.After(req.timeout)
loop:
   for {
      select {
      case r := <-resp.ResponseCh():
         data = append(data, r.Payload)
      case <-after:
         //retE = errors.New("time out")
         break loop
      }
   }
   return retE, data
}
//serf接收到query消息后需要访问本地进程获得结果并反馈
func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) {
   br := bhomeclient.Request{
      Path: req.Topic,
      Body: req.Msg,
   }
   reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000)
   if err != nil {
      return err, nil
   }
   return nil, reply
}
var (
   riPool                  map[string]bhome_msg_dev.MsgDevRegisterInfo
   topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center"
   riLock                  sync.Mutex
)
func init() {
   riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo)
}
//从serf中获取注册中心的注册信息是否变化,并将集群中所有注册信息汇总
func DoSyncRegisterInfo() error {
   if hms == nil || Agent == nil {
      return errors.New("ms or Agent handle is nil")
   }
   infos, err := hms.GetRegisteredClient()
   if err != nil {
      return err
   }
   ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
   ri := bhome_msg_dev.MsgDevRegisterInfo{
      DevId:     []byte(config.Server.AnalyServerId),
      Ip:        []byte(ipv4),
      ProxyPort: 4020,
   }
   for _, i := range infos {
      if i.Online {
         mta := bhome_msg_dev.MsgProcTopicsAll{
            Online: i.Online,
         }
         if i.Proc != nil {
            mta.Proc = &bhome_msg_dev.ProcInfo{
               ProcId:      i.Proc.ProcId,
               Name:        i.Proc.Name,
               PublicInfo:  i.Proc.PublicInfo,
               PrivateInfo: i.Proc.PrivateInfo,
            }
            //proc的端口信息放在PublicInfo中
            if string(mta.Proc.PublicInfo) != "" {
               if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 {
                  mta.Port = int32(iPort)
               }
            }
         }
         if i.Topics != nil {
            mta.TopicList = &bhome_msg_dev.MsgTopicList{
               TopicList: i.Topics.TopicList,
            }
         }
         if i.SubLocalTopics != nil {
            mta.LocalSublist = &bhome_msg_dev.MsgTopicList{
               TopicList: i.SubLocalTopics.TopicList,
            }
         }
         if i.SubNetTopics != nil {
            mta.NetSublist = &bhome_msg_dev.MsgTopicList{
               TopicList: i.SubNetTopics.TopicList,
            }
         }
         ri.ProcInfos = append(ri.ProcInfos, &mta)
      }
   }
   riLock.Lock()
   bs := false //判断是否更改,如果发生更改则需要重新发布到center中
   if v, ok := riPool[string(ri.DevId)]; !ok {
      bs = true
      riPool[string(ri.DevId)] = ri
   } else {
      if !reflect.DeepEqual(v, ri) {
         bs = true
         riPool[string(ri.DevId)] = ri
      }
   }
   riLock.Unlock()
   if bs {
      bts, err := proto.Marshal(&ri)
      if err != nil {
         logger.Debug("marshal riPool err:", err)
         return err
      }
      logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts))
      Agent.UserEvent(UserEventSyncRegisterInfo, bts, false)
   }
   return nil
}
func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) {
   logger.Debug("Enter compareRPool")
   riLock.Lock()
   riPool[string(ri.DevId)] = *ri
   riLock.Unlock()
   b, _ := proto.Marshal(ri)
   logger.Debug("before hms.Publish to topicPubAllRegisterInfo")
   err := hms.Publish(topicPubAllRegisterInfo, b)
   if err != nil {
      logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err)
   } else {
      logger.Debug("hms.Publish success")
   }
}
func SyncProxy(topic string, body []byte) error {
   return Agent.UserEvent(topic, body, false)
}
package serf
import (
   "basic.com/valib/bhomeclient.git"
   "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
   "basic.com/valib/logger.git"
   "basic.com/valib/serf.git/serf"
   "encoding/json"
   "errors"
   "github.com/golang/protobuf/proto"
   "reflect"
   "strconv"
   "sync"
   "time"
   "vamicro/config"
   "vamicro/extend/util"
   "vamicro/system-service/bhome_msg_dev"
)
type RequestSerfTopicMsg struct {
   Topic         string   `json:"topic"`
   Msg           []byte   `json:"msg"`
   targetNodeIds []string `json:"targetNodeIds"`
   timeout       time.Duration
}
type RequestSerfTopicResp struct {
   DevId string      `json:"devId"`
   DevIp string      `json:"devIp"`
   Topic string      `json:"topic"`
   Procs []TopicProc `json:"procs"`
}
type TopicProc struct {
   ProcId string              `json:"procId"`
   Addr   bhome_msg.BHAddress `json:"addr"`
}
//通过topic获取指定topic的所有节点,供bus使用
func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) {
   req := RequestSerfTopicMsg{
      Topic:   QueryNodesByTopic,
      timeout: time.Second * 10,
   }
   err, data := QuerySerfNodes(req)
   if err != nil {
      return nil, err
   }
   var result []RequestSerfTopicResp
   if len(data) > 0 {
      for _, d := range data {
         var arr []RequestSerfTopicResp
         if err = json.Unmarshal(d, &arr); err != nil {
            logger.Debug("json Unmarshal err:", err)
         } else {
            result = append(result, arr...)
         }
      }
   }
   return result, nil
}
//响应根据topic获取地址
func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) {
   clients := hms.GetLocalNetNodeByTopic(topic)
   if len(clients) == 0 {
      return nil, errors.New("topic not exist")
   } else {
      ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
      var destArr []TopicProc
      for idx, c := range clients {
         tp := TopicProc{
            ProcId: string(c.ProcId),
            Addr:   *(clients[idx].Addr),
         }
         //if c. != nil {
         //   for _,p := range cli.Service.TopicList {
         //      if topic == string(p) {
         //         tps = append(tps, TopicProc{
         //            ProcId: string(cli.Proc.ProcId),
         //            ProcName: string(cli.Proc.Name),
         //         })
         //      }
         //   }
         //}
         destArr = append(destArr, tp)
      }
      r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{
         Topic: topic,
         DevId: config.Server.AnalyServerId,
         DevIp: ipv4,
         Procs: destArr,
      })
      return r, nil
   }
}
//代理请求,通过当前的serf发送给其他的serf节点
func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) {
   resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{
      FilterNodes: req.targetNodeIds,
   })
   if err != nil {
      return err, nil
   }
   var data [][]byte
   var retE error
   after := time.After(req.timeout)
loop:
   for {
      select {
      case r := <-resp.ResponseCh():
         data = append(data, r.Payload)
      case <-after:
         //retE = errors.New("time out")
         break loop
      }
   }
   return retE, data
}
//serf接收到query消息后需要访问本地进程获得结果并反馈
func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) {
   br := bhomeclient.Request{
      Path: req.Topic,
      Body: req.Msg,
   }
   reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000)
   if err != nil {
      return err, nil
   }
   return nil, reply
}
var (
   riPool                  map[string]bhome_msg_dev.MsgDevRegisterInfo
   topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center"
   riLock                  sync.Mutex
)
func init() {
   riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo)
}
//从serf中获取注册中心的注册信息是否变化,并将集群中所有注册信息汇总
func DoSyncRegisterInfo() error {
   if hms == nil || Agent == nil {
      return errors.New("ms or Agent handle is nil")
   }
   infos, err := hms.GetRegisteredClient()
   if err != nil {
      return err
   }
   ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
   ri := bhome_msg_dev.MsgDevRegisterInfo{
      DevId:     []byte(config.Server.AnalyServerId),
      Ip:        []byte(ipv4),
      ProxyPort: 4020,
   }
   for _, i := range infos {
      if i.Online {
         mta := bhome_msg_dev.MsgProcTopicsAll{
            Online: i.Online,
         }
         if i.Proc != nil {
            mta.Proc = &bhome_msg_dev.ProcInfo{
               ProcId:      i.Proc.ProcId,
               Name:        i.Proc.Name,
               PublicInfo:  i.Proc.PublicInfo,
               PrivateInfo: i.Proc.PrivateInfo,
            }
            //proc的端口信息放在PublicInfo中
            if string(mta.Proc.PublicInfo) != "" {
               if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 {
                  mta.Port = int32(iPort)
               }
            }
         }
         if i.Topics != nil {
            mta.TopicList = &bhome_msg_dev.MsgTopicList{
               TopicList: i.Topics.TopicList,
            }
         }
         if i.SubLocalTopics != nil {
            mta.LocalSublist = &bhome_msg_dev.MsgTopicList{
               TopicList: i.SubLocalTopics.TopicList,
            }
         }
         if i.SubNetTopics != nil {
            mta.NetSublist = &bhome_msg_dev.MsgTopicList{
               TopicList: i.SubNetTopics.TopicList,
            }
         }
         ri.ProcInfos = append(ri.ProcInfos, &mta)
      }
   }
   riLock.Lock()
   bs := false //判断是否更改,如果发生更改则需要重新发布到center中
   if v, ok := riPool[string(ri.DevId)]; !ok {
      bs = true
      riPool[string(ri.DevId)] = ri
   } else {
      if !reflect.DeepEqual(v, ri) {
         bs = true
         riPool[string(ri.DevId)] = ri
      }
   }
   riLock.Unlock()
   if bs {
      bts, err := proto.Marshal(&ri)
      if err != nil {
         logger.Debug("marshal riPool err:", err)
         return err
      }
      logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts))
      Agent.UserEvent(UserEventSyncRegisterInfo, bts, false)
   }
   return nil
}
func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) {
   logger.Debug("Enter compareRPool")
   riLock.Lock()
   riPool[string(ri.DevId)] = *ri
   riLock.Unlock()
   b, _ := proto.Marshal(ri)
   logger.Debug("before hms.Publish to topicPubAllRegisterInfo")
   err := hms.Publish(topicPubAllRegisterInfo, b)
   if err != nil {
      logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err)
   } else {
      logger.Debug("hms.Publish success")
   }
}
func SyncProxy(topic string, body []byte) error {
   return Agent.UserEvent(topic, body, false)
}