zhangmeng
2023-12-06 6f73c447e25fa0b593f77338791051984990640c
micronode.go
@@ -12,28 +12,28 @@
)
type MicroNode struct {
   ctx       context.Context
   handle       *BHBus
   reg       *RegisterInfo
   procInfo    *ProcInfo
   handlers    map[string]MicroFunc
   serverId    string
   fnLog       func(...interface{})
   ctx      context.Context
   handle   *BHBus
   reg      *RegisterInfo
   procInfo *ProcInfo
   handlers map[string]MicroFunc
   serverId string
   fnLog    func(...interface{})
   SubCh       chan *bhome_msg.MsgPublish
   SubCh chan *bhome_msg.MsgPublish
   mtx       sync.Mutex
   started    bool
   mtx     sync.Mutex
   started bool
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
   conf := NewConfig(KEY_REGISTER,512,5,60000,60000,2000, fnLog)
func NewMicroNode(ctx context.Context, q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error) {
   conf := NewConfig(KEY_REGISTER, 512, 5, 60000, 60000, 2000, fnLog)
   handle, err := Register(ctx, q, conf, reg)
   if err != nil {
      return nil, err
   }
   mn := &MicroNode {
      ctx: ctx,
   mn := &MicroNode{
      ctx:      ctx,
      serverId: serverId,
      handle:   handle,
      reg:      reg,
@@ -76,8 +76,6 @@
         return
      case <-t.C:
         h.HeartBeat()
      default:
         time.Sleep(500 * time.Millisecond)
      }
   }
}
@@ -96,21 +94,19 @@
      for {
         select {
         case <- ms.ctx.Done():
         case <-ms.ctx.Done():
            return
         case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息
            go ms.serve(ms.handle.ctx, &msgR)
         case msgS := <-ms.handle.ChSub:
            ms.SubCh <- &msgS
         default:
            time.Sleep(50 * time.Millisecond)
         }
      }
   }
   ms.mtx.Unlock()
}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply, error) {
   t := time.Now()
   ms.printLog("1:", time.Since(t))
@@ -118,17 +114,17 @@
   rb, _ := json.Marshal(request)
   msgR := &bhome_msg.MsgRequestTopic{
      Topic: []byte(request.Path),
      Data: rb,
      Data:  rb,
   }
   ms.printLog("2:", time.Since(t))
   return ms.handle.Request(serverId, msgR, milliSecs)
}
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply, error) {
   rb, _ := json.Marshal(request)
   msgR := &bhome_msg.MsgRequestTopic{
      Topic: []byte(request.Path),
      Data: rb,
      Data:  rb,
   }
   return ms.handle.Request(serverId, msgR, milliSecs)
@@ -142,7 +138,7 @@
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []*bhome_msg.MsgQueryTopicReply_BHNodeAddress {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
   if err != nil {
      ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
      ms.printLog("topic:", topicName, " netNodes:", netNodes, "err:", err)
      return nil
   }
   return netNodes
@@ -157,7 +153,7 @@
//   return netNodes
//}
func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info,error) {
func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info, error) {
   return ms.handle.RequestCenter()
}
@@ -171,13 +167,13 @@
   err := json.Unmarshal(msgR.Data, &reqBody)
   if err != nil {
      ms.printLog("serve unmarshal msgR.Body err:", err)
      ri = &Reply {
      ri = &Reply{
         Msg: err.Error(),
      }
   } else {
      ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
      ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:", reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
      if f,ok := ms.handlers[reqBody.Path];ok {
      if f, ok := ms.handlers[reqBody.Path]; ok {
         reqBody.SrcProc = ProcInfo{
            ID: msgR.ProcId,
         }
@@ -196,8 +192,8 @@
         ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
         ri = &Reply{
            Success: false,
            Msg: "请求的接口不存在,请检查url",
            Data: "请求的接口不存在,请检查url",
            Msg:     "请求的接口不存在,请检查url",
            Data:    "请求的接口不存在,请检查url",
         }
      }
   }
@@ -211,15 +207,15 @@
}
//发布到本机
func (ms *MicroNode) Publish(topic string,msg []byte) error {
func (ms *MicroNode) Publish(topic string, msg []byte) error {
   var nodes []bhome_msg.BHAddress
   return ms.PublishNet(nodes, topic, msg)
}
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string, data []byte) error {
   pi := &bhome_msg.MsgPublish{
      Topic: []byte(topic),
      Data: data,
      Data:  data,
   }
   return ms.handle.Pub(nodes, pi)
}
@@ -227,7 +223,7 @@
func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
   pi := &bhome_msg.MsgPublish{
      Topic: []byte(topic),
      Data: data,
      Data:  data,
   }
   return ms.handle.PubTimeout(nodes, pi, timeout)
}
@@ -235,12 +231,12 @@
//订阅主题
func (ms *MicroNode) Subscribe(topics []string) {
   ms.handle.Sub(topics)
   for _,t := range topics {
   for _, t := range topics {
      if ms.reg.SubTopic == nil {
         ms.reg.SubTopic = make([]string, 0)
      }
      found := false
      for _,it := range ms.reg.SubTopic {
      for _, it := range ms.reg.SubTopic {
         if it == t {
            found = true
            break
@@ -258,9 +254,9 @@
   ms.handle.DeSub(topics)
   if ms.reg.SubTopic != nil {
      var leftTopics []string
      for _,t := range ms.reg.SubTopic {
      for _, t := range ms.reg.SubTopic {
         found := false
         for _,it := range topics {
         for _, it := range topics {
            if it == t {
               found = true
               break