| | |
| | | serverId string |
| | | fnLog func(...interface{}) |
| | | |
| | | SubChM map[string]chan *MsgInfo //以订阅的主题为key |
| | | SubCh chan *MsgInfo |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ |
| | |
| | | return nil, err |
| | | } |
| | | mn := &MicroNode { |
| | | ctx: ctx, |
| | | serverId: serverId, |
| | | handle: handle, |
| | | reg: reg, |
| | | procInfo: procInfo, |
| | | fnLog: fnLog, |
| | | SubChM: make(map[string]chan *MsgInfo), |
| | | } |
| | | for _,subTopic := range reg.SubTopic { |
| | | mn.SubChM[subTopic] = make(chan *MsgInfo, 512) |
| | | fnLog: fnLog, |
| | | SubCh: make(chan *MsgInfo, 512), |
| | | } |
| | | |
| | | return mn, nil |
| | |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | if ch,ok := ms.SubChM[msgS.Topic];ok { |
| | | ch <- msgS |
| | | } |
| | | ms.SubCh <- msgS |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | |
| | | } |
| | | |
| | | //获取本机中某一个主题的 key (结果只有一个元素) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) |
| | | if err != nil { |
| | | ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) |
| | | return nil |
| | | } |
| | | return netNodes |
| | | } |
| | | |
| | | //获取集群中所有节点某个主题的key信息, (结果可能有多个) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | |
| | | } |
| | | cr, err := ms.handle.RequestCenter(&r) |
| | | if err != nil { |
| | | ms.printLog("requestCenter reply:", cr, "err:", err) |
| | | return nil, err |
| | | } |
| | | if cr.Status == REPLY_SUCCESS && cr.Body != nil { |
| | |
| | | } |
| | | |
| | | func (ms *MicroNode) serve(msgR *MsgInfo, p int) { |
| | | if ms.handlers == nil { |
| | | return |
| | | } |
| | | |
| | | var reqBody Request |
| | | err := json.Unmarshal(msgR.Body, &reqBody) |
| | | if err != nil { |
| | | ms.printLog("serve unmarshal msgR.Body err:", err) |
| | | return |
| | | } |
| | | |
| | | ms.printLog("reqBody:", reqBody) |
| | |
| | | return ms.handle.Pub(nodes, pi) |
| | | } |
| | | |
| | | //订阅主题 |
| | | func (ms *MicroNode) Subscribe(topics []string) { |
| | | ms.handle.Sub(topics) |
| | | for _,t := range topics { |
| | | if ms.reg.SubTopic == nil { |
| | | ms.reg.SubTopic = make([]string, 0) |
| | | } |
| | | found := false |
| | | for _,it := range ms.reg.SubTopic { |
| | | if it == t { |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | ms.reg.SubTopic = append(ms.reg.SubTopic, t) |
| | | } |
| | | } |
| | | } |
| | | |
| | | //取消订阅的主题 |
| | | func (ms *MicroNode) DeSub(topics []string) { |
| | | ms.printLog("DeSub topics:", topics) |
| | | ms.handle.DeSub(topics) |
| | | if ms.reg.SubTopic != nil { |
| | | var leftTopics []string |
| | | for _,t := range ms.reg.SubTopic { |
| | | found := false |
| | | for _,it := range topics { |
| | | if it == t { |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | leftTopics = append(leftTopics, t) |
| | | } |
| | | } |
| | | ms.reg.SubTopic = leftTopics |
| | | } |
| | | } |
| | | |
| | | //free handle |