package serf
|
|
import (
|
"basic.com/valib/bhomeclient.git"
|
"basic.com/valib/c_bhomebus.git/proto/source/bhome_msg"
|
"basic.com/valib/logger.git"
|
"basic.com/valib/serf.git/serf"
|
"encoding/json"
|
"errors"
|
"github.com/golang/protobuf/proto"
|
"reflect"
|
"strconv"
|
"sync"
|
"time"
|
"vamicro/config"
|
"vamicro/extend/util"
|
"vamicro/system-service/bhome_msg_dev"
|
)
|
|
type RequestSerfTopicMsg struct {
|
Topic string `json:"topic"`
|
Msg []byte `json:"msg"`
|
targetNodeIds []string `json:"targetNodeIds"`
|
timeout time.Duration
|
}
|
|
type RequestSerfTopicResp struct {
|
DevId string `json:"devId"`
|
DevIp string `json:"devIp"`
|
Topic string `json:"topic"`
|
Procs []TopicProc `json:"procs"`
|
}
|
|
type TopicProc struct {
|
ProcId string `json:"procId"`
|
Addr bhome_msg.BHAddress `json:"addr"`
|
}
|
|
//通过topic获取指定topic的所有节点,供bus使用
|
func GetProcsInClusterByTopic(topic string) ([]RequestSerfTopicResp, error) {
|
req := RequestSerfTopicMsg{
|
Topic: QueryNodesByTopic,
|
timeout: time.Second * 10,
|
}
|
err, data := QuerySerfNodes(req)
|
if err != nil {
|
return nil, err
|
}
|
var result []RequestSerfTopicResp
|
if len(data) > 0 {
|
for _, d := range data {
|
var arr []RequestSerfTopicResp
|
if err = json.Unmarshal(d, &arr); err != nil {
|
logger.Debug("json Unmarshal err:", err)
|
} else {
|
result = append(result, arr...)
|
}
|
}
|
}
|
|
return result, nil
|
}
|
|
//响应根据topic获取地址
|
func ResponseGetNodeByTopic(topic string) ([]RequestSerfTopicResp, error) {
|
clients := hms.GetLocalNetNodeByTopic(topic)
|
if len(clients) == 0 {
|
return nil, errors.New("topic not exist")
|
} else {
|
ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
|
var destArr []TopicProc
|
for idx, c := range clients {
|
tp := TopicProc{
|
ProcId: string(c.ProcId),
|
Addr: *(clients[idx].Addr),
|
}
|
//if c. != nil {
|
// for _,p := range cli.Service.TopicList {
|
// if topic == string(p) {
|
// tps = append(tps, TopicProc{
|
// ProcId: string(cli.Proc.ProcId),
|
// ProcName: string(cli.Proc.Name),
|
// })
|
// }
|
// }
|
//}
|
destArr = append(destArr, tp)
|
}
|
r := append([]RequestSerfTopicResp{}, RequestSerfTopicResp{
|
Topic: topic,
|
DevId: config.Server.AnalyServerId,
|
DevIp: ipv4,
|
Procs: destArr,
|
})
|
return r, nil
|
}
|
}
|
|
//代理请求,通过当前的serf发送给其他的serf节点
|
func QuerySerfNodes(req RequestSerfTopicMsg) (error, [][]byte) {
|
resp, err := Agent.Query(req.Topic, req.Msg, &serf.QueryParam{
|
FilterNodes: req.targetNodeIds,
|
})
|
if err != nil {
|
return err, nil
|
}
|
|
var data [][]byte
|
var retE error
|
after := time.After(req.timeout)
|
loop:
|
for {
|
select {
|
case r := <-resp.ResponseCh():
|
data = append(data, r.Payload)
|
case <-after:
|
//retE = errors.New("time out")
|
break loop
|
}
|
}
|
|
return retE, data
|
}
|
|
//serf接收到query消息后需要访问本地进程获得结果并反馈
|
func QueryLocalProc(req RequestSerfTopicMsg) (error, interface{}) {
|
br := bhomeclient.Request{
|
Path: req.Topic,
|
Body: req.Msg,
|
}
|
reply, err := hms.RequestTopic(config.Server.AnalyServerId, br, 5000)
|
if err != nil {
|
return err, nil
|
}
|
return nil, reply
|
}
|
|
var (
|
riPool map[string]bhome_msg_dev.MsgDevRegisterInfo
|
topicPubAllRegisterInfo = "pub-allRegisterInfo-to-center"
|
riLock sync.Mutex
|
)
|
|
func init() {
|
riPool = make(map[string]bhome_msg_dev.MsgDevRegisterInfo)
|
}
|
|
//从serf中获取注册中心的注册信息是否变化,并将集群中所有注册信息汇总
|
func DoSyncRegisterInfo() error {
|
if hms == nil || Agent == nil {
|
return errors.New("ms or Agent handle is nil")
|
}
|
infos, err := hms.GetRegisteredClient()
|
if err != nil {
|
return err
|
}
|
ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
|
ri := bhome_msg_dev.MsgDevRegisterInfo{
|
DevId: []byte(config.Server.AnalyServerId),
|
Ip: []byte(ipv4),
|
ProxyPort: 4020,
|
}
|
for _, i := range infos {
|
if i.Online {
|
mta := bhome_msg_dev.MsgProcTopicsAll{
|
Online: i.Online,
|
}
|
if i.Proc != nil {
|
mta.Proc = &bhome_msg_dev.ProcInfo{
|
ProcId: i.Proc.ProcId,
|
Name: i.Proc.Name,
|
PublicInfo: i.Proc.PublicInfo,
|
PrivateInfo: i.Proc.PrivateInfo,
|
}
|
//proc的端口信息放在PublicInfo中
|
if string(mta.Proc.PublicInfo) != "" {
|
if iPort, e := strconv.Atoi(string(mta.Proc.PublicInfo)); e == nil && iPort > 0 && iPort < 65535 {
|
mta.Port = int32(iPort)
|
}
|
}
|
}
|
if i.Topics != nil {
|
mta.TopicList = &bhome_msg_dev.MsgTopicList{
|
TopicList: i.Topics.TopicList,
|
}
|
}
|
if i.SubLocalTopics != nil {
|
mta.LocalSublist = &bhome_msg_dev.MsgTopicList{
|
TopicList: i.SubLocalTopics.TopicList,
|
}
|
}
|
if i.SubNetTopics != nil {
|
mta.NetSublist = &bhome_msg_dev.MsgTopicList{
|
TopicList: i.SubNetTopics.TopicList,
|
}
|
}
|
ri.ProcInfos = append(ri.ProcInfos, &mta)
|
}
|
}
|
|
riLock.Lock()
|
bs := false //判断是否更改,如果发生更改则需要重新发布到center中
|
if v, ok := riPool[string(ri.DevId)]; !ok {
|
bs = true
|
riPool[string(ri.DevId)] = ri
|
} else {
|
if !reflect.DeepEqual(v, ri) {
|
bs = true
|
riPool[string(ri.DevId)] = ri
|
}
|
}
|
riLock.Unlock()
|
|
if bs {
|
bts, err := proto.Marshal(&ri)
|
if err != nil {
|
logger.Debug("marshal riPool err:", err)
|
return err
|
}
|
|
logger.Debug("UserEventSyncRegisterInfo len(bts):", len(bts))
|
Agent.UserEvent(UserEventSyncRegisterInfo, bts, false)
|
}
|
return nil
|
}
|
|
func compareRPool(ri *bhome_msg_dev.MsgDevRegisterInfo) {
|
logger.Debug("Enter compareRPool")
|
riLock.Lock()
|
riPool[string(ri.DevId)] = *ri
|
riLock.Unlock()
|
|
b, _ := proto.Marshal(ri)
|
|
logger.Debug("before hms.Publish to topicPubAllRegisterInfo")
|
err := hms.Publish(topicPubAllRegisterInfo, b)
|
if err != nil {
|
logger.Error("hms.Publish to topicPubAllRegisterInfo err:", err)
|
} else {
|
logger.Debug("hms.Publish success")
|
}
|
}
|
|
func SyncProxy(topic string, body []byte) error {
|
return Agent.UserEvent(topic, body, false)
|
}
|