| | |
| | | //发布到远程机器 |
| | | PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error |
| | | |
| | | //订阅一些主题,可动态新增 |
| | | Subscribe(topics []string) |
| | | |
| | | //注销订阅的主题 |
| | | DeSub(topics []string) |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | //注销订阅的主题 |
| | | func (h *BHBus) DeSub(topics []string) { |
| | | if topics != nil { |
| | | for _,t := range topics { |
| | | if n := h.sockSub.sock.Desub(t); n != 0 { |
| | | h.printLog("DeSub topic:", t, " n:", n) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | //获取sub 或者需要reply的消息 |
| | | func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { |
| | |
| | | serverId string |
| | | fnLog func(...interface{}) |
| | | |
| | | SubChM map[string]chan *MsgInfo //以订阅的主题为key |
| | | SubCh chan *MsgInfo |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ |
| | |
| | | handle: handle, |
| | | reg: reg, |
| | | procInfo: procInfo, |
| | | fnLog: fnLog, |
| | | SubChM: make(map[string]chan *MsgInfo), |
| | | } |
| | | for _,subTopic := range reg.SubTopic { |
| | | mn.SubChM[subTopic] = make(chan *MsgInfo, 512) |
| | | fnLog: fnLog, |
| | | SubCh: make(chan *MsgInfo, 512), |
| | | } |
| | | |
| | | return mn, nil |
| | |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | if ch,ok := ms.SubChM[msgS.Topic];ok { |
| | | ch <- msgS |
| | | } |
| | | ms.SubCh <- msgS |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | |
| | | return ms.handle.Pub(nodes, pi) |
| | | } |
| | | |
| | | //订阅主题 |
| | | func (ms *MicroNode) Subscribe(topics []string) { |
| | | ms.handle.Sub(topics) |
| | | for _,t := range topics { |
| | | if ms.reg.SubTopic == nil { |
| | | ms.reg.SubTopic = make([]string, 0) |
| | | } |
| | | found := false |
| | | for _,it := range ms.reg.SubTopic { |
| | | if it == t { |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | ms.reg.SubTopic = append(ms.reg.SubTopic, t) |
| | | } |
| | | } |
| | | } |
| | | |
| | | //取消订阅的主题 |
| | | func (ms *MicroNode) DeSub(topics []string) { |
| | | ms.printLog("DeSub topics:", topics) |
| | | ms.handle.DeSub(topics) |
| | | if ms.reg.SubTopic != nil { |
| | | var leftTopics []string |
| | | for _,t := range ms.reg.SubTopic { |
| | | found := false |
| | | for _,it := range topics { |
| | | if it == t { |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | leftTopics = append(leftTopics, t) |
| | | } |
| | | } |
| | | ms.reg.SubTopic = leftTopics |
| | | } |
| | | } |
| | | |
| | | //free handle |