| | |
| | | ) |
| | | |
| | | type MsgReq struct { |
| | | ProcId string |
| | | ProcId string |
| | | bhome_msg.MsgRequestTopic |
| | | Src unsafe.Pointer |
| | | Src unsafe.Pointer |
| | | } |
| | | |
| | | type BHBus struct { |
| | | ctx context.Context |
| | | ctx context.Context |
| | | |
| | | ri *RegisterInfo |
| | | ri *RegisterInfo |
| | | |
| | | conf *Config |
| | | conf *Config |
| | | |
| | | nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步 |
| | | mtxNode sync.Mutex //访问节点主题表时,需要加锁 |
| | | nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步 |
| | | mtxNode sync.Mutex //访问节点主题表时,需要加锁 |
| | | |
| | | wg *sync.WaitGroup |
| | | wg *sync.WaitGroup |
| | | |
| | | ChSub chan bhome_msg.MsgPublish |
| | | ChReply chan MsgReq |
| | | } |
| | | |
| | | //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 |
| | | func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) { |
| | | func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- MsgReq, logFn func(...interface{})) { |
| | | var procId string |
| | | var msg bhome_msg.MsgRequestTopic |
| | | var src unsafe.Pointer |
| | |
| | | } |
| | | |
| | | //Register |
| | | func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { |
| | | handle := &BHBus { |
| | | func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus, error) { |
| | | handle := &BHBus{ |
| | | ctx: ctx, |
| | | conf: config, |
| | | ri: ri, |
| | | wg: &sync.WaitGroup{}, |
| | | wg: &sync.WaitGroup{}, |
| | | ChSub: make(chan bhome_msg.MsgPublish, config.chSize), |
| | | ChReply: make(chan MsgReq, config.chSize), |
| | | } |
| | |
| | | //如果注册失败,就会一直尝试注册 |
| | | procI := bhome_msg.ProcInfo{ |
| | | ProcId: []byte(ri.Proc.ID), |
| | | Name: []byte(ri.Proc.Name), |
| | | Name: []byte(ri.Proc.Name), |
| | | } |
| | | var regReply bhome_msg.MsgCommonReply |
| | | loop: |
| | |
| | | select { |
| | | case <-q: |
| | | handle.printLog("register <-q") |
| | | return nil,errors.New("ctx is done") |
| | | return nil, errors.New("ctx is done") |
| | | default: |
| | | |
| | | if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) { |
| | |
| | | if ri.PubTopic != nil && len(ri.PubTopic) > 0 { |
| | | topics := bhome_msg.MsgTopicList{} |
| | | var regTopicReply bhome_msg.MsgCommonReply |
| | | for _,t := range ri.PubTopic { |
| | | for _, t := range ri.PubTopic { |
| | | topics.TopicList = append(topics.TopicList, []byte(t)) |
| | | } |
| | | loopRT: |
| | |
| | | go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog) |
| | | } |
| | | |
| | | handle.printLog("register done!" ) |
| | | handle.printLog("register done!") |
| | | |
| | | //有订阅消息才需要启动协程接收消息 |
| | | if len(ri.SubTopic) > 0 { |
| | | handle.printLog("sub topics") |
| | | var subList bhome_msg.MsgTopicList |
| | | for _,v := range ri.SubTopic { |
| | | for _, v := range ri.SubTopic { |
| | | subList.TopicList = append(subList.TopicList, []byte(v)) |
| | | } |
| | | |
| | |
| | | if len(ri.SubNetTopic) > 0 { |
| | | handle.printLog("sub net topics") |
| | | var subNetList bhome_msg.MsgTopicList |
| | | for _,v := range ri.SubNetTopic { |
| | | for _, v := range ri.SubNetTopic { |
| | | subNetList.TopicList = append(subNetList.TopicList, []byte(v)) |
| | | } |
| | | var subNetReply bhome_msg.MsgCommonReply |
| | |
| | | return handle, nil |
| | | } |
| | | |
| | | func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) { |
| | | func recvSubRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- bhome_msg.MsgPublish, logFn func(...interface{})) { |
| | | var procId string |
| | | var msg bhome_msg.MsgPublish |
| | | for { |
| | |
| | | h.printLog("DeRegister") |
| | | req := bhome_msg.ProcInfo{ |
| | | ProcId: []byte(h.ri.Proc.ID), |
| | | Name: []byte(h.ri.Proc.Name), |
| | | Name: []byte(h.ri.Proc.Name), |
| | | } |
| | | reply := bhome_msg.MsgCommonReply{} |
| | | if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) { |
| | |
| | | h.printLog("h.wg.Wait done") |
| | | } |
| | | |
| | | |
| | | //HeartBeat send |
| | | func (h *BHBus) HeartBeat() error { |
| | | procI := bhome_msg.ProcInfo{ |
| | | ProcId: []byte(h.ri.Proc.ID), |
| | | Name: []byte(h.ri.Proc.Name), |
| | | Name: []byte(h.ri.Proc.Name), |
| | | } |
| | | var ret bhome_msg.MsgCommonReply |
| | | if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) { |
| | |
| | | return errors.New("send heartBeat return false") |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | //更新主题列表 |
| | | func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { |
| | |
| | | //获取topic对应的key |
| | | //如果传了serverId不为空,则获取指定机器上的topic-key |
| | | //如果server为空,则获取所有节点上topic-key |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string, srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress, error) { |
| | | dest := bhome_msg.BHAddress{} |
| | | reqTopic := bhome_msg.MsgQueryTopic{ |
| | | Topic: []byte(topic), |
| | |
| | | var reply Reply |
| | | if err := json.Unmarshal(mrt.Data, &reply); err != nil { |
| | | h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data)) |
| | | return nil,err |
| | | return nil, err |
| | | } |
| | | |
| | | return &reply, nil |
| | |
| | | } |
| | | |
| | | func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error { |
| | | data,err := json.Marshal(i) |
| | | data, err := json.Marshal(i) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | rep := bhome_msg.MsgRequestTopicReply{ |
| | | Data: data, |
| | | } |
| | | |
| | | if bhsgo.SendReply(src, &rep) { |
| | | return nil |
| | | } |
| | |
| | | return nil, errors.New("QueryProcs ret flase") |
| | | } |
| | | } |
| | | |
| | | |
| | | //向主题通道中发布消息 |
| | | func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error { |
| | |
| | | |
| | | //追加订阅的主题消息 |
| | | func (h *BHBus) Sub(topics []string) { |
| | | if topics != nil && len(topics) >0 { |
| | | if topics != nil && len(topics) > 0 { |
| | | var subList bhome_msg.MsgTopicList |
| | | for _, v := range topics { |
| | | subList.TopicList = append(subList.TopicList, []byte(v)) |
| | |
| | | if topics != nil { |
| | | |
| | | } |
| | | } |
| | | } |