liuxiaolong
2021-04-02 a35153875f213929601a39c47f0823b310210321
micronode.go
@@ -7,6 +7,7 @@
   "errors"
   "fmt"
   "os"
   "sync"
   "time"
)
@@ -20,10 +21,13 @@
   fnLog       func(...interface{})
   SubCh       chan *MsgInfo
   mtx       sync.Mutex
   started    bool
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
   conf := NewConfig(KEY_REGISTER,512,5,1000,100,1000, fnLog)
   conf := NewConfig(KEY_REGISTER,512,5,5000,5000,2000, fnLog)
   handle, err := Register(ctx, q, conf, reg)
   if err != nil {
      return nil, err
@@ -83,32 +87,75 @@
}
func (ms *MicroNode) StartClient() {
   go ms.startHeartbeat()
   ms.mtx.Lock()
   defer ms.mtx.Unlock()
   if !ms.started {
      ms.started = true
      go ms.startHeartbeat()
   }
}
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
   ms.handlers = funcMap
   ms.mtx.Lock()
   if !ms.started {
      ms.started = true
      ms.mtx.Unlock()
   go ms.startHeartbeat()
      ms.handlers = funcMap
   for {
      select {
      case <- ms.ctx.Done():
         return
      default:
         msgS, msgR, keyR := ms.handle.GetMsg()
         if msgS != nil {
            //收到其它进程的发布消息
            ms.printLog("Recv Sub Message:", string(msgS.Body))
            ms.SubCh <- msgS
         }
         if msgR != nil {
            //收到其它进程的请求消息
            go ms.serve(msgR, keyR)
      go ms.startHeartbeat()
      for {
         select {
         case <- ms.ctx.Done():
            return
         default:
            msgS, msgR, keyR := ms.handle.GetMsg()
            if msgS != nil {
               //收到其它进程的发布消息
               ms.printLog("Recv Sub Message:", string(msgS.Body))
               ms.SubCh <- msgS
            }
            if msgR != nil {
               //收到其它进程的请求消息
               go ms.serve(msgR, keyR)
            }
            time.Sleep(50 * time.Millisecond)
         }
      }
      //接收订阅到的消息
      //go ms.startRecvSubMsg()
      //作为server启动
      //ms.serve()
   }
   ms.mtx.Unlock()
}
//开始接收订阅消息
//func (ms *MicroNode) startRecvSubMsg() {
//   for {
//      select {
//      case <- ms.ctx.Done():
//         return
//      default:
//         msgS, msgR, keyR := ms.handle.GetMsg()
//         if msgS != nil {
//            //收到其它进程的发布消息
//            ms.printLog("Recv Sub Message:", string(msgS.Body))
//            ms.SubCh <- msgS
//         }
//         if msgR != nil {
//            //收到其它进程的请求消息
//            go ms.serve(msgR, keyR)
//         }
//
//         time.Sleep(50 * time.Millisecond)
//      }
//   }
//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
   t := time.Now()
@@ -187,6 +234,52 @@
   return nil, fmt.Errorf("GetRegisteredClient list failed")
}
//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
//   ri := &Reply{}
//   if ms.handlers == nil {
//      ri.Msg = "send wrong addr, check yourself!!!"
//   } else {
//      var msgR MsgInfo
//      err := json.Unmarshal(rdata, &msgR)
//      if err != nil {
//         ri.Msg = err.Error()
//      } else {
//         var reqBody Request
//         err = json.Unmarshal(rdata, &msgR.Body)
//         if err != nil {
//            ri.Msg = err.Error()
//         } else {
//            ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
//            if f,ok := ms.handlers[reqBody.Path];ok {
//               reqBody.SrcProc = msgR.SrcProc
//               ri = f(&reqBody)
//               ms.printLog("call funcMap f,reply:", *ri)
//            } else {
//               ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
//               ri.Msg = "请求的接口不存在,请检查url"
//            }
//         }
//      }
//   }
//   result, err := json.Marshal(*ri)
//   if err != nil {
//      sdata = nil
//   } else {
//      sdata = &result
//   }
//   return ri.Success
//}
//func (ms *MicroNode) serve() {
//   if ms.handlers == nil {
//      return
//   }
//   for i:=0;i<10;i++ {
//      ms.handle.wg.Add(1)
//      go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
//   }
//}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
   if ms.handlers == nil {
      return
@@ -201,14 +294,18 @@
         Msg: err.Error(),
      }
   } else {
      ms.printLog("reqBody:", reqBody, "to key: ", p)
      ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
      if f,ok := ms.handlers[reqBody.Path];ok {
         reqBody.SrcProc = msgR.SrcProc
         ri = f(&reqBody)
         ms.printLog("call funcMap f,reply:", *ri)
         ctx := Context{
            ms,
            ms,
         }
         ri = f(&ctx, &reqBody)
         ms.printLog("call funcMap f,reply.Success:", ri.Success)
      } else {
         ms.printLog("ms.funcMap not eixst path")
         ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
         ri = &Reply{
            Success: false,
            Msg: "请求的接口不存在,请检查url",
@@ -239,6 +336,14 @@
   return ms.handle.Pub(nodes, pi)
}
func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
   pi := &MsgInfo{
      Topic: topic,
      Body: msg,
   }
   return ms.handle.PubTimeout(nodes, pi, timeout)
}
//订阅主题
func (ms *MicroNode) Subscribe(topics []string) {
   ms.handle.Sub(topics)