liuxiaolong
2021-04-25 c7069befa28a0f2594f0746044318a30d6989c19
micronode.go
@@ -1,7 +1,7 @@
package bhomeclient
import (
   "basic.com/valib/bhomebus.git"
   "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
   "context"
   "encoding/json"
   "errors"
@@ -20,7 +20,7 @@
   serverId    string
   fnLog       func(...interface{})
   SubCh       chan *MsgInfo
   SubCh       chan *bhome_msg.MsgPublish
   mtx       sync.Mutex
   started    bool
@@ -39,7 +39,7 @@
      reg:      reg,
      procInfo: &reg.Proc,
      fnLog:    fnLog,
      SubCh:    make(chan *MsgInfo, 512),
      SubCh:    make(chan *bhome_msg.MsgPublish, 512),
   }
   return mn, nil
@@ -65,14 +65,6 @@
}
func (ms *MicroNode) startHeartbeat() {
   hbi := &HeartBeatInfo{
      HealthLevel: "health",
      Fps:         12,
      WarnInfo:    "warn",
      ErrorInfo:   "error",
      Proc:    *ms.procInfo,
   }
   t := time.NewTicker(1 * time.Second)
   defer t.Stop()
@@ -81,7 +73,9 @@
      case <-ms.ctx.Done():
         return
      case <-t.C:
         ms.handle.HeartBeat(hbi)
         ms.handle.HeartBeat()
      default:
         time.Sleep(500 * time.Millisecond)
      }
   }
}
@@ -110,52 +104,18 @@
         select {
         case <- ms.ctx.Done():
            return
         case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息
            go ms.serve(ms.handle.ctx, &msgR)
         case msgS := <-ms.handle.ChSub:
            ms.printLog("Recv Sub Message:", string(msgS.Data))
            ms.SubCh <- &msgS
         default:
            msgS, msgR, keyR := ms.handle.GetMsg()
            if msgS != nil {
               //收到其它进程的发布消息
               ms.printLog("Recv Sub Message:", string(msgS.Body))
               ms.SubCh <- msgS
            }
            if msgR != nil {
               //收到其它进程的请求消息
               go ms.serve(msgR, keyR)
            }
            time.Sleep(50 * time.Millisecond)
         }
      }
      //接收订阅到的消息
      //go ms.startRecvSubMsg()
      //作为server启动
      //ms.serve()
   }
   ms.mtx.Unlock()
}
//开始接收订阅消息
//func (ms *MicroNode) startRecvSubMsg() {
//   for {
//      select {
//      case <- ms.ctx.Done():
//         return
//      default:
//         msgS, msgR, keyR := ms.handle.GetMsg()
//         if msgS != nil {
//            //收到其它进程的发布消息
//            ms.printLog("Recv Sub Message:", string(msgS.Body))
//            ms.SubCh <- msgS
//         }
//         if msgR != nil {
//            //收到其它进程的请求消息
//            go ms.serve(msgR, keyR)
//         }
//
//         time.Sleep(50 * time.Millisecond)
//      }
//   }
//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
   t := time.Now()
@@ -163,9 +123,9 @@
   ms.printLog("1:", time.Since(t))
   t = time.Now()
   rb, _ := json.Marshal(request)
   msgR := &MsgInfo {
      Topic: request.Path,
      Body: rb,
   msgR := &bhome_msg.MsgRequestTopic{
      Topic: []byte(request.Path),
      Data: rb,
   }
   ms.printLog("2:", time.Since(t))
   return ms.handle.Request(serverId, msgR, milliSecs)
@@ -173,20 +133,20 @@
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
   rb, _ := json.Marshal(request)
   msgR := &MsgInfo{
      Topic: request.Path,
      Body: rb,
   msgR := &bhome_msg.MsgRequestTopic{
      Topic: []byte(request.Path),
      Data: rb,
   }
   return ms.handle.Request(serverId, msgR, milliSecs)
}
func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
   return ms.handle.RequestOnly(rData, nodes)
func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) {
   return ms.handle.RequestOnly(req, dest)
}
//获取本机中某一个主题的 key  (结果只有一个元素)
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
   if err != nil {
      ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
@@ -196,7 +156,7 @@
}
//获取集群中所有节点某个主题的key信息,   (结果可能有多个)
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress {
   netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
   if err != nil {
      return nil
@@ -205,11 +165,7 @@
}
func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
   r := MsgInfo{
      SrcProc: *ms.procInfo,
      MsgType: MesgType_ReqRep,
      Topic: TOPIC_QUERYPROC,
   }
   r := bhome_msg.MsgRequestTopic{}
   cr, err := ms.handle.RequestCenter(&r)
   if err != nil {
      ms.printLog("requestCenter reply:", cr, "err:", err)
@@ -234,76 +190,37 @@
   return nil, fmt.Errorf("GetRegisteredClient list failed")
}
//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
//   ri := &Reply{}
//   if ms.handlers == nil {
//      ri.Msg = "send wrong addr, check yourself!!!"
//   } else {
//      var msgR MsgInfo
//      err := json.Unmarshal(rdata, &msgR)
//      if err != nil {
//         ri.Msg = err.Error()
//      } else {
//         var reqBody Request
//         err = json.Unmarshal(rdata, &msgR.Body)
//         if err != nil {
//            ri.Msg = err.Error()
//         } else {
//            ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
//            if f,ok := ms.handlers[reqBody.Path];ok {
//               reqBody.SrcProc = msgR.SrcProc
//               ri = f(&reqBody)
//               ms.printLog("call funcMap f,reply:", *ri)
//            } else {
//               ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
//               ri.Msg = "请求的接口不存在,请检查url"
//            }
//         }
//      }
//   }
//   result, err := json.Marshal(*ri)
//   if err != nil {
//      sdata = nil
//   } else {
//      sdata = &result
//   }
//   return ri.Success
//}
//func (ms *MicroNode) serve() {
//   if ms.handlers == nil {
//      return
//   }
//   for i:=0;i<10;i++ {
//      ms.handle.wg.Add(1)
//      go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
//   }
//}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) {
   if ms.handlers == nil {
      return
   }
   var reqBody Request
   var ri *Reply
   err := json.Unmarshal(msgR.Body, &reqBody)
   err := json.Unmarshal(msgR.Data, &reqBody)
   if err != nil {
      ms.printLog("serve unmarshal msgR.Body err:", err)
      ri = &Reply {
         Msg: err.Error(),
      }
   } else {
      ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
      ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
      if f,ok := ms.handlers[reqBody.Path];ok {
         reqBody.SrcProc = msgR.SrcProc
         reqBody.SrcProc = ProcInfo{
            ID: msgR.ProcId,
         }
         h := WrapperHandler{
            ms,
            ms,
         }
         ri = f(&h, &reqBody)
         ms.printLog("call funcMap f,reply.Success:", ri.Success)
         select {
         case <-ctx.Done():
            ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!")
         default:
            ri = f(&h, &reqBody)
            ms.printLog("call funcMap f,reply.Success:", ri.Success)
         }
      } else {
         ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
         ri = &Reply{
@@ -314,7 +231,7 @@
      }
   }
   retErr := ms.handle.Reply(p, ri)
   retErr := ms.handle.Reply(msgR.Src, ri)
   if retErr != nil {
      ms.printLog("retErr:", retErr)
   }
@@ -322,24 +239,22 @@
//发布到本机
func (ms *MicroNode) Publish(topic string,msg []byte) error {
   nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: 8,
   })
   var nodes []bhome_msg.BHAddress
   return ms.PublishNet(nodes, topic, msg)
}
func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
   pi := &MsgInfo{
      Topic: topic,
      Body: msg,
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
   pi := &bhome_msg.MsgPublish{
      Topic: []byte(topic),
      Data: data,
   }
   return ms.handle.Pub(nodes, pi)
}
func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
   pi := &MsgInfo{
      Topic: topic,
      Body: msg,
func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
   pi := &bhome_msg.MsgPublish{
      Topic: []byte(topic),
      Data: data,
   }
   return ms.handle.PubTimeout(nodes, pi, timeout)
}