From b93b67a7e237cbf59569dcaacfa84257856ba16e Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期三, 30 十二月 2020 14:19:34 +0800 Subject: [PATCH] 添加DeSub,优化订阅主题消息通道 --- micronode.go | 51 +++++++++++++++++++++---- broker.go | 4 ++ hbusc.go | 11 +++++ 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/broker.go b/broker.go index 48d3d72..cc78e6e 100644 --- a/broker.go +++ b/broker.go @@ -9,5 +9,9 @@ //鍙戝竷鍒拌繙绋嬫満鍣� PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error + //璁㈤槄涓�浜涗富棰�,鍙姩鎬佹柊澧� Subscribe(topics []string) + + //娉ㄩ攢璁㈤槄鐨勪富棰� + DeSub(topics []string) } diff --git a/hbusc.go b/hbusc.go index 6ff4175..def5a75 100644 --- a/hbusc.go +++ b/hbusc.go @@ -487,6 +487,17 @@ } } +//娉ㄩ攢璁㈤槄鐨勪富棰� +func (h *BHBus) DeSub(topics []string) { + if topics != nil { + for _,t := range topics { + if n := h.sockSub.sock.Desub(t); n != 0 { + h.printLog("DeSub topic:", t, " n:", n) + } + } + } +} + //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { diff --git a/micronode.go b/micronode.go index 46731d7..1cc69d7 100644 --- a/micronode.go +++ b/micronode.go @@ -19,7 +19,7 @@ serverId string fnLog func(...interface{}) - SubChM map[string]chan *MsgInfo //浠ヨ闃呯殑涓婚涓簁ey + SubCh chan *MsgInfo } func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ @@ -33,11 +33,8 @@ handle: handle, reg: reg, procInfo: procInfo, - fnLog: fnLog, - SubChM: make(map[string]chan *MsgInfo), - } - for _,subTopic := range reg.SubTopic { - mn.SubChM[subTopic] = make(chan *MsgInfo, 512) + fnLog: fnLog, + SubCh: make(chan *MsgInfo, 512), } return mn, nil @@ -102,9 +99,7 @@ if msgS != nil { //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� ms.printLog("Recv Sub Message:", string(msgS.Body)) - if ch,ok := ms.SubChM[msgS.Topic];ok { - ch <- msgS - } + ms.SubCh <- msgS } if msgR != nil { //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� @@ -259,8 +254,46 @@ return ms.handle.Pub(nodes, pi) } +//璁㈤槄涓婚 func (ms *MicroNode) Subscribe(topics []string) { ms.handle.Sub(topics) + for _,t := range topics { + if ms.reg.SubTopic == nil { + ms.reg.SubTopic = make([]string, 0) + } + found := false + for _,it := range ms.reg.SubTopic { + if it == t { + found = true + break + } + } + if !found { + ms.reg.SubTopic = append(ms.reg.SubTopic, t) + } + } +} + +//鍙栨秷璁㈤槄鐨勪富棰� +func (ms *MicroNode) DeSub(topics []string) { + ms.printLog("DeSub topics:", topics) + ms.handle.DeSub(topics) + if ms.reg.SubTopic != nil { + var leftTopics []string + for _,t := range ms.reg.SubTopic { + found := false + for _,it := range topics { + if it == t { + found = true + break + } + } + if !found { + leftTopics = append(leftTopics, t) + } + } + ms.reg.SubTopic = leftTopics + } } //free handle -- Gitblit v1.8.0