liuxiaolong
2021-04-02 a35153875f213929601a39c47f0823b310210321
micronode.go
@@ -7,6 +7,7 @@
   "errors"
   "fmt"
   "os"
   "sync"
   "time"
)
@@ -20,6 +21,9 @@
   fnLog       func(...interface{})
   SubCh       chan *MsgInfo
   mtx       sync.Mutex
   started    bool
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
@@ -83,41 +87,51 @@
}
func (ms *MicroNode) StartClient() {
   go ms.startHeartbeat()
   ms.mtx.Lock()
   defer ms.mtx.Unlock()
   if !ms.started {
      ms.started = true
      go ms.startHeartbeat()
   }
}
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
   ms.handlers = funcMap
   ms.mtx.Lock()
   if !ms.started {
      ms.started = true
      ms.mtx.Unlock()
   go ms.startHeartbeat()
      ms.handlers = funcMap
   for {
      select {
      case <- ms.ctx.Done():
         return
      default:
         msgS, msgR, keyR := ms.handle.GetMsg()
         if msgS != nil {
            //收到其它进程的发布消息
            ms.printLog("Recv Sub Message:", string(msgS.Body))
            ms.SubCh <- msgS
      go ms.startHeartbeat()
      for {
         select {
         case <- ms.ctx.Done():
            return
         default:
            msgS, msgR, keyR := ms.handle.GetMsg()
            if msgS != nil {
               //收到其它进程的发布消息
               ms.printLog("Recv Sub Message:", string(msgS.Body))
               ms.SubCh <- msgS
            }
            if msgR != nil {
               //收到其它进程的请求消息
               go ms.serve(msgR, keyR)
            }
            time.Sleep(50 * time.Millisecond)
         }
         if msgR != nil {
            //收到其它进程的请求消息
            go ms.serve(msgR, keyR)
         }
         time.Sleep(50 * time.Millisecond)
      }
      //接收订阅到的消息
      //go ms.startRecvSubMsg()
      //作为server启动
      //ms.serve()
   }
   //接收订阅到的消息
   //go ms.startRecvSubMsg()
   //作为server启动
   //ms.serve()
   ms.mtx.Unlock()
}
//开始接收订阅消息
@@ -284,7 +298,11 @@
      if f,ok := ms.handlers[reqBody.Path];ok {
         reqBody.SrcProc = msgR.SrcProc
         ri = f(&reqBody)
         ctx := Context{
            ms,
            ms,
         }
         ri = f(&ctx, &reqBody)
         ms.printLog("call funcMap f,reply.Success:", ri.Success)
      } else {
         ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)