liuxiaolong
2021-01-08 44ed992c5d12b985c474c877a68439d5e1d77e3a
micronode.go
@@ -19,7 +19,7 @@
   serverId    string
   fnLog       func(...interface{})
   SubChM       map[string]chan *MsgInfo //以订阅的主题为key
   SubCh       chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
@@ -29,15 +29,13 @@
      return nil, err
   }
   mn := &MicroNode {
      ctx: ctx,
      serverId: serverId,
      handle:   handle,
      reg:      reg,
      procInfo: procInfo,
      fnLog: fnLog,
      SubChM:   make(map[string]chan *MsgInfo),
   }
   for _,subTopic := range reg.SubTopic {
      mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
      fnLog:    fnLog,
      SubCh:    make(chan *MsgInfo, 512),
   }
   return mn, nil
@@ -102,9 +100,7 @@
         if msgS != nil {
            //收到其它进程的发布消息
            ms.printLog("Recv Sub Message:", string(msgS.Body))
            if ch,ok := ms.SubChM[msgS.Topic];ok {
               ch <- msgS
            }
            ms.SubCh <- msgS
         }
         if msgR != nil {
            //收到其它进程的请求消息
@@ -174,17 +170,18 @@
}
//获取本机中某一个主题的 key  (结果只有一个元素)
func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
   if err != nil {
      ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
      return nil
   }
   return netNodes
}
//获取集群中所有节点某个主题的key信息,   (结果可能有多个)
func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
   if err != nil {
      return nil
   }
@@ -199,6 +196,7 @@
   }
   cr, err := ms.handle.RequestCenter(&r)
   if err != nil {
      ms.printLog("requestCenter reply:", cr, "err:", err)
      return nil, err
   }
   if cr.Status == REPLY_SUCCESS && cr.Body != nil {
@@ -216,10 +214,15 @@
}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
   if ms.handlers == nil {
      return
   }
   var reqBody Request
   err := json.Unmarshal(msgR.Body, &reqBody)
   if err != nil {
      ms.printLog("serve unmarshal msgR.Body err:", err)
      return
   }
   ms.printLog("reqBody:", reqBody)
@@ -259,8 +262,46 @@
   return ms.handle.Pub(nodes, pi)
}
//订阅主题
func (ms *MicroNode) Subscribe(topics []string) {
   ms.handle.Sub(topics)
   for _,t := range topics {
      if ms.reg.SubTopic == nil {
         ms.reg.SubTopic = make([]string, 0)
      }
      found := false
      for _,it := range ms.reg.SubTopic {
         if it == t {
            found = true
            break
         }
      }
      if !found {
         ms.reg.SubTopic = append(ms.reg.SubTopic, t)
      }
   }
}
//取消订阅的主题
func (ms *MicroNode) DeSub(topics []string) {
   ms.printLog("DeSub topics:", topics)
   ms.handle.DeSub(topics)
   if ms.reg.SubTopic != nil {
      var leftTopics []string
      for _,t := range ms.reg.SubTopic {
         found := false
         for _,it := range topics {
            if it == t {
               found = true
               break
            }
         }
         if !found {
            leftTopics = append(leftTopics, t)
         }
      }
      ms.reg.SubTopic = leftTopics
   }
}
//free handle