liuxiaolong
2021-02-05 81afb6ffbf7f76f49644a1832dcfe241552d7e08
micronode.go
@@ -90,21 +90,24 @@
   ms.handlers = funcMap
   go ms.startHeartbeat()
   //接收订阅到的消息
   go ms.startRecvSubMsg()
   //作为server启动
   ms.serve()
}
//开始接收订阅消息
func (ms *MicroNode) startRecvSubMsg() {
   for {
      select {
      case <- ms.ctx.Done():
         return
      default:
         msgS, msgR, keyR := ms.handle.GetMsg()
         msgS := ms.handle.GetMsg()
         if msgS != nil {
            //收到其它进程的发布消息
            ms.printLog("Recv Sub Message:", string(msgS.Body))
            ms.SubCh <- msgS
         }
         if msgR != nil {
            //收到其它进程的请求消息
            go ms.serve(msgR, keyR)
         }
         time.Sleep(50 * time.Millisecond)
@@ -189,39 +192,49 @@
   return nil, fmt.Errorf("GetRegisteredClient list failed")
}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
   ri := &Reply{}
   if ms.handlers == nil {
      return
   }
   var reqBody Request
   var ri *Reply
   err := json.Unmarshal(msgR.Body, &reqBody)
   if err != nil {
      ms.printLog("serve unmarshal msgR.Body err:", err)
      ri = &Reply {
         Msg: err.Error(),
      }
      ri.Msg = "send wrong addr, check yourself!!!"
   } else {
      ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
      var msgR MsgInfo
      err := json.Unmarshal(rdata, &msgR)
      if err != nil {
         ri.Msg = err.Error()
      } else {
         var reqBody Request
         err = json.Unmarshal(rdata, &msgR.Body)
         if err != nil {
            ri.Msg = err.Error()
         } else {
            ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
      if f,ok := ms.handlers[reqBody.Path];ok {
         reqBody.SrcProc = msgR.SrcProc
         ri = f(&reqBody)
         ms.printLog("call funcMap f,reply:", *ri)
      } else {
         ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
         ri = &Reply{
            Success: false,
            Msg: "请求的接口不存在,请检查url",
            Data: "请求的接口不存在,请检查url",
               ri.Msg = "请求的接口不存在,请检查url"
         }
      }
      }
   }
   result, err := json.Marshal(*ri)
   if err != nil {
      sdata = nil
   } else {
      sdata = &result
   }
   return ri.Success
   }
   retErr := ms.handle.Reply(p, ri)
   if retErr != nil {
      ms.printLog("retErr:", retErr)
func (ms *MicroNode) serve() {
   if ms.handlers == nil {
      return
   }
   for i:=0;i<10;i++ {
      ms.handle.wg.Add(1)
      go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
   }
}