liuxiaolong
2021-01-15 c3003a24e2cf4ef49db38b8b392bc7a788554fff
micronode.go
@@ -22,8 +22,8 @@
   SubCh       chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
   conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog)
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
   conf := NewConfig(KEY_REGISTER,512,5,1000,100,1000, fnLog)
   handle, err := Register(ctx, q, conf, reg)
   if err != nil {
      return nil, err
@@ -33,7 +33,7 @@
      serverId: serverId,
      handle:   handle,
      reg:      reg,
      procInfo: procInfo,
      procInfo: &reg.Proc,
      fnLog:    fnLog,
      SubCh:    make(chan *MsgInfo, 512),
   }
@@ -69,7 +69,7 @@
      Proc:    *ms.procInfo,
   }
   t := time.NewTicker(time.Second)
   t := time.NewTicker(4 * time.Second)
   defer t.Stop()
   for {
@@ -110,13 +110,9 @@
   }
}
func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
   t := time.Now()
   topicName := request.Header("Servicename")
   if topicName == "" {
      return nil,errors.New("Servicename 不能为空")
   }
   ms.printLog("1:", time.Since(t))
   t = time.Now()
   rb, _ := json.Marshal(request)
@@ -126,7 +122,7 @@
   }
   ms.printLog("2:", time.Since(t))
   t = time.Now()
   mi,err := ms.handle.Request(serverId, msgR, 5000)
   mi,err := ms.handle.Request(serverId, msgR, milliSecs)
   if mi == nil || err != nil {
      return nil, err
   }
@@ -146,14 +142,14 @@
   return ri, nil
}
func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
   rb, _ := json.Marshal(request)
   msgR := &MsgInfo{
      Topic: request.Path,
      Body: rb,
   }
   mi, err := ms.handle.Request(serverId, msgR, 5000)
   mi, err := ms.handle.Request(serverId, msgR, milliSecs)
   if err != nil {
      return nil, err
   }
@@ -173,7 +169,7 @@
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
   if err != nil {
      ms.printLog("netNodes:", netNodes, "err:", err)
      ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
      return nil
   }
   return netNodes
@@ -225,7 +221,7 @@
      return
   }
   ms.printLog("reqBody:", reqBody)
   ms.printLog("reqBody:", reqBody, "to key: ", p)
   var ri *Reply
   if f,ok := ms.handlers[reqBody.Path];ok {
      ri = f(&reqBody)
@@ -245,7 +241,10 @@
   rMsg := MsgInfo{
      Body: rd,
   }
   ms.handle.Reply(p, rMsg)
   retErr := ms.handle.Reply(p, rMsg)
   if retErr != nil {
      ms.printLog("retErr:", retErr)
   }
}
//发布到本机