zhangmeng
2023-12-05 2d5c411a22a653eb7cbde621db4e89b07755a852
hbusc.go
@@ -14,29 +14,29 @@
)
type MsgReq struct {
   ProcId       string
   ProcId string
   bhome_msg.MsgRequestTopic
   Src       unsafe.Pointer
   Src unsafe.Pointer
}
type BHBus struct {
   ctx       context.Context
   ctx context.Context
   ri          *RegisterInfo
   ri *RegisterInfo
   conf       *Config
   conf *Config
   nodes       []NodeInfo     //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
   mtxNode    sync.Mutex   //访问节点主题表时,需要加锁
   nodes   []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
   mtxNode sync.Mutex //访问节点主题表时,需要加锁
   wg          *sync.WaitGroup
   wg *sync.WaitGroup
   ChSub   chan bhome_msg.MsgPublish
   ChReply chan MsgReq
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- MsgReq, logFn func(...interface{})) {
   var procId string
   var msg bhome_msg.MsgRequestTopic
   var src unsafe.Pointer
@@ -66,12 +66,12 @@
}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
   handle := &BHBus {
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus, error) {
   handle := &BHBus{
      ctx:     ctx,
      conf:    config,
      ri:      ri,
      wg: &sync.WaitGroup{},
      wg:      &sync.WaitGroup{},
      ChSub:   make(chan bhome_msg.MsgPublish, config.chSize),
      ChReply: make(chan MsgReq, config.chSize),
   }
@@ -79,7 +79,7 @@
   //如果注册失败,就会一直尝试注册
   procI := bhome_msg.ProcInfo{
      ProcId: []byte(ri.Proc.ID),
      Name: []byte(ri.Proc.Name),
      Name:   []byte(ri.Proc.Name),
   }
   var regReply bhome_msg.MsgCommonReply
loop:
@@ -87,7 +87,7 @@
      select {
      case <-q:
         handle.printLog("register <-q")
         return nil,errors.New("ctx is done")
         return nil, errors.New("ctx is done")
      default:
         if bhsgo.Register(&procI, &regReply, handle.conf.sendTimeOut) {
@@ -101,7 +101,7 @@
   if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
      topics := bhome_msg.MsgTopicList{}
      var regTopicReply bhome_msg.MsgCommonReply
      for _,t := range ri.PubTopic {
      for _, t := range ri.PubTopic {
         topics.TopicList = append(topics.TopicList, []byte(t))
      }
   loopRT:
@@ -124,13 +124,13 @@
      go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
   }
   handle.printLog("register done!" )
   handle.printLog("register done!")
   //有订阅消息才需要启动协程接收消息
   if len(ri.SubTopic) > 0 {
      handle.printLog("sub topics")
      var subList bhome_msg.MsgTopicList
      for _,v := range ri.SubTopic {
      for _, v := range ri.SubTopic {
         subList.TopicList = append(subList.TopicList, []byte(v))
      }
@@ -143,7 +143,7 @@
   if len(ri.SubNetTopic) > 0 {
      handle.printLog("sub net topics")
      var subNetList bhome_msg.MsgTopicList
      for _,v := range ri.SubNetTopic {
      for _, v := range ri.SubNetTopic {
         subNetList.TopicList = append(subNetList.TopicList, []byte(v))
      }
      var subNetReply bhome_msg.MsgCommonReply
@@ -161,7 +161,7 @@
   return handle, nil
}
func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
func recvSubRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- bhome_msg.MsgPublish, logFn func(...interface{})) {
   var procId string
   var msg bhome_msg.MsgPublish
   for {
@@ -190,7 +190,7 @@
   h.printLog("DeRegister")
   req := bhome_msg.ProcInfo{
      ProcId: []byte(h.ri.Proc.ID),
      Name: []byte(h.ri.Proc.Name),
      Name:   []byte(h.ri.Proc.Name),
   }
   reply := bhome_msg.MsgCommonReply{}
   if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) {
@@ -214,12 +214,11 @@
   h.printLog("h.wg.Wait done")
}
//HeartBeat send
func (h *BHBus) HeartBeat() error {
   procI := bhome_msg.ProcInfo{
      ProcId: []byte(h.ri.Proc.ID),
      Name: []byte(h.ri.Proc.Name),
      Name:   []byte(h.ri.Proc.Name),
   }
   var ret bhome_msg.MsgCommonReply
   if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
@@ -228,8 +227,6 @@
      return errors.New("send heartBeat return false")
   }
}
//更新主题列表
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
@@ -241,7 +238,7 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) {
func (h *BHBus) GetNetNodeByTopic(serverId string, srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress, error) {
   dest := bhome_msg.BHAddress{}
   reqTopic := bhome_msg.MsgQueryTopic{
      Topic: []byte(topic),
@@ -267,7 +264,7 @@
      var reply Reply
      if err := json.Unmarshal(mrt.Data, &reply); err != nil {
         h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data))
         return nil,err
         return nil, err
      }
      return &reply, nil
@@ -297,13 +294,14 @@
}
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
   data,err := json.Marshal(i)
   data, err := json.Marshal(i)
   if err != nil {
      return err
   }
   rep := bhome_msg.MsgRequestTopicReply{
      Data: data,
   }
   if bhsgo.SendReply(src, &rep) {
      return nil
   }
@@ -321,7 +319,6 @@
      return nil, errors.New("QueryProcs ret flase")
   }
}
//向主题通道中发布消息
func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
@@ -341,7 +338,7 @@
//追加订阅的主题消息
func (h *BHBus) Sub(topics []string) {
   if topics != nil && len(topics) >0 {
   if topics != nil && len(topics) > 0 {
      var subList bhome_msg.MsgTopicList
      for _, v := range topics {
         subList.TopicList = append(subList.TopicList, []byte(v))
@@ -359,4 +356,4 @@
   if topics != nil {
   }
}
}