liuxiaolong
2021-02-05 81afb6ffbf7f76f49644a1832dcfe241552d7e08
micronode.go
@@ -1,4 +1,4 @@
package mc
package bhomeclient
import (
   "basic.com/valib/bhomebus.git"
@@ -19,25 +19,23 @@
   serverId    string
   fnLog       func(...interface{})
   SubChM       map[string]chan *MsgInfo //以订阅的主题为key
   SubCh       chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
   conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog)
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
   conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog)
   handle, err := Register(ctx, q, conf, reg)
   if err != nil {
      return nil, err
   }
   mn := &MicroNode {
      ctx: ctx,
      serverId: serverId,
      handle:   handle,
      reg:      reg,
      procInfo: procInfo,
      fnLog: fnLog,
      SubChM:   make(map[string]chan *MsgInfo),
   }
   for _,subTopic := range reg.SubTopic {
      mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
      procInfo: &reg.Proc,
      fnLog:    fnLog,
      SubCh:    make(chan *MsgInfo, 512),
   }
   return mn, nil
@@ -71,7 +69,7 @@
      Proc:    *ms.procInfo,
   }
   t := time.NewTicker(time.Second)
   t := time.NewTicker(1 * time.Second)
   defer t.Stop()
   for {
@@ -92,35 +90,34 @@
   ms.handlers = funcMap
   go ms.startHeartbeat()
   //接收订阅到的消息
   go ms.startRecvSubMsg()
   //作为server启动
   ms.serve()
}
//开始接收订阅消息
func (ms *MicroNode) startRecvSubMsg() {
   for {
      select {
      case <- ms.ctx.Done():
         return
      default:
         msgS, msgR, keyR := ms.handle.GetMsg()
         msgS := ms.handle.GetMsg()
         if msgS != nil {
            //收到其它进程的发布消息
            ms.printLog("Recv Sub Message:", string(msgS.Body))
            if ch,ok := ms.SubChM[msgS.Topic];ok {
               ch <- msgS
            }
            ms.SubCh <- msgS
         }
         if msgR != nil {
            //收到其它进程的请求消息
            go ms.serve(msgR, keyR)
         }
         time.Sleep(50 * time.Millisecond)
      }
   }
}
func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
   t := time.Now()
   topicName := request.Header("Servicename")
   if topicName == "" {
      return nil,errors.New("Servicename 不能为空")
   }
   ms.printLog("1:", time.Since(t))
   t = time.Now()
   rb, _ := json.Marshal(request)
@@ -129,101 +126,123 @@
      Body: rb,
   }
   ms.printLog("2:", time.Since(t))
   t = time.Now()
   mi,err := ms.handle.Request(serverId, msgR, 5000)
   if mi == nil || err != nil {
      return nil, err
   }
   ms.printLog("3:", time.Since(t))
   t = time.Now()
   ri := new(Reply)
   err = json.Unmarshal(mi.Body, ri)
   if err != nil {
      ms.printLog("unmarshal mi.Body err:", err)
      ri = &Reply{
         Success: false,
         Msg: "服务请求失败",
         Data: "服务请求失败",
      }
   }
   ms.printLog("4:", time.Since(t))
   return ri, nil
   return ms.handle.Request(serverId, msgR, milliSecs)
}
func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
   rb, _ := json.Marshal(request)
   msgR := &MsgInfo{
      Topic: request.Path,
      Body: rb,
   }
   mi, err := ms.handle.Request(serverId, msgR, 5000)
   if err != nil {
      return nil, err
   }
   var ri *Reply
   err = json.Unmarshal(mi.Body, ri)
   if err != nil {
      ri = &Reply{
         Success: false,
         Msg: "服务请求失败",
         Data: "服务请求失败",
      }
   }
   return ri, nil
   return ms.handle.Request(serverId, msgR, milliSecs)
}
func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
   return ms.handle.RequestOnly(rData, nodes)
}
//获取本机中某一个主题的 key  (结果只有一个元素)
func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
   if err != nil {
      ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
      return nil
   }
   return netNodes
}
//获取集群中所有节点某个主题的key信息,   (结果可能有多个)
func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
   netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
   if err != nil {
      return nil
   }
   return netNodes
}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
   var reqBody Request
   err := json.Unmarshal(msgR.Body, &reqBody)
   if err != nil {
      ms.printLog("serve unmarshal msgR.Body err:", err)
func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
   r := MsgInfo{
      SrcProc: *ms.procInfo,
      MsgType: MesgType_ReqRep,
      Topic: TOPIC_QUERYPROC,
   }
   ms.printLog("reqBody:", reqBody)
   var ri *Reply
   if f,ok := ms.handlers[reqBody.Path];ok {
      ri = f(&reqBody)
      ms.printLog("call funcMap f,reply:", *ri)
   cr, err := ms.handle.RequestCenter(&r)
   if err != nil {
      ms.printLog("requestCenter reply:", cr, "err:", err)
      return nil, err
   }
   if cr.Success {
      rd,err := json.Marshal(cr.Data)
      if err == nil {
         var list []RegisteredClient
         err = json.Unmarshal(rd, &list)
         if err == nil {
            return list, nil
         } else {
            ms.printLog("unmarshal to RegisteredClient list err:", err)
         }
      } else {
         return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
      }
   } else {
      ms.printLog("ms.funcMap not eixst path")
      ri = &Reply{
         Success: false,
         Msg: "请求的接口不存在,请检查url",
         Data: "请求的接口不存在,请检查url",
      ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data)
   }
   return nil, fmt.Errorf("GetRegisteredClient list failed")
}
func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
   ri := &Reply{}
   if ms.handlers == nil {
      ri.Msg = "send wrong addr, check yourself!!!"
   } else {
      var msgR MsgInfo
      err := json.Unmarshal(rdata, &msgR)
      if err != nil {
         ri.Msg = err.Error()
      } else {
         var reqBody Request
         err = json.Unmarshal(rdata, &msgR.Body)
         if err != nil {
            ri.Msg = err.Error()
         } else {
            ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
            if f,ok := ms.handlers[reqBody.Path];ok {
               reqBody.SrcProc = msgR.SrcProc
               ri = f(&reqBody)
               ms.printLog("call funcMap f,reply:", *ri)
            } else {
               ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
               ri.Msg = "请求的接口不存在,请检查url"
            }
         }
      }
   }
   rd,err := json.Marshal(*ri)
   result, err := json.Marshal(*ri)
   if err != nil {
      ms.printLog("marshal *ri err:", err)
      sdata = nil
   } else {
      sdata = &result
   }
   rMsg := MsgInfo{
      Body: rd,
   return ri.Success
}
func (ms *MicroNode) serve() {
   if ms.handlers == nil {
      return
   }
   ms.handle.Reply(p, rMsg)
   for i:=0;i<10;i++ {
      ms.handle.wg.Add(1)
      go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
   }
}
//发布到本机
func (ms *MicroNode) Publish(topic string,msg []byte) error {
   nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
   nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
      Key: 8,
   })
   return ms.PublishNet(nodes, topic, msg)
}
@@ -235,9 +254,54 @@
   return ms.handle.Pub(nodes, pi)
}
func (ms *MicroNode) Subscribe(topics []string) chan []byte {
   ch := make(chan []byte)
   return ch
func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
   pi := &MsgInfo{
      Topic: topic,
      Body: msg,
   }
   return ms.handle.PubTimeout(nodes, pi, timeout)
}
//订阅主题
func (ms *MicroNode) Subscribe(topics []string) {
   ms.handle.Sub(topics)
   for _,t := range topics {
      if ms.reg.SubTopic == nil {
         ms.reg.SubTopic = make([]string, 0)
      }
      found := false
      for _,it := range ms.reg.SubTopic {
         if it == t {
            found = true
            break
         }
      }
      if !found {
         ms.reg.SubTopic = append(ms.reg.SubTopic, t)
      }
   }
}
//取消订阅的主题
func (ms *MicroNode) DeSub(topics []string) {
   ms.printLog("DeSub topics:", topics)
   ms.handle.DeSub(topics)
   if ms.reg.SubTopic != nil {
      var leftTopics []string
      for _,t := range ms.reg.SubTopic {
         found := false
         for _,it := range topics {
            if it == t {
               found = true
               break
            }
         }
         if !found {
            leftTopics = append(leftTopics, t)
         }
      }
      ms.reg.SubTopic = leftTopics
   }
}
//free handle