From b93b67a7e237cbf59569dcaacfa84257856ba16e Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期三, 30 十二月 2020 14:19:34 +0800
Subject: [PATCH] 添加DeSub,优化订阅主题消息通道
---
micronode.go | 51 +++++++++++++++++++++----
broker.go | 4 ++
hbusc.go | 11 +++++
3 files changed, 57 insertions(+), 9 deletions(-)
diff --git a/broker.go b/broker.go
index 48d3d72..cc78e6e 100644
--- a/broker.go
+++ b/broker.go
@@ -9,5 +9,9 @@
//鍙戝竷鍒拌繙绋嬫満鍣�
PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
+ //璁㈤槄涓�浜涗富棰�,鍙姩鎬佹柊澧�
Subscribe(topics []string)
+
+ //娉ㄩ攢璁㈤槄鐨勪富棰�
+ DeSub(topics []string)
}
diff --git a/hbusc.go b/hbusc.go
index 6ff4175..def5a75 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -487,6 +487,17 @@
}
}
+//娉ㄩ攢璁㈤槄鐨勪富棰�
+func (h *BHBus) DeSub(topics []string) {
+ if topics != nil {
+ for _,t := range topics {
+ if n := h.sockSub.sock.Desub(t); n != 0 {
+ h.printLog("DeSub topic:", t, " n:", n)
+ }
+ }
+ }
+}
+
//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
diff --git a/micronode.go b/micronode.go
index 46731d7..1cc69d7 100644
--- a/micronode.go
+++ b/micronode.go
@@ -19,7 +19,7 @@
serverId string
fnLog func(...interface{})
- SubChM map[string]chan *MsgInfo //浠ヨ闃呯殑涓婚涓簁ey
+ SubCh chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
@@ -33,11 +33,8 @@
handle: handle,
reg: reg,
procInfo: procInfo,
- fnLog: fnLog,
- SubChM: make(map[string]chan *MsgInfo),
- }
- for _,subTopic := range reg.SubTopic {
- mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
+ fnLog: fnLog,
+ SubCh: make(chan *MsgInfo, 512),
}
return mn, nil
@@ -102,9 +99,7 @@
if msgS != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
ms.printLog("Recv Sub Message:", string(msgS.Body))
- if ch,ok := ms.SubChM[msgS.Topic];ok {
- ch <- msgS
- }
+ ms.SubCh <- msgS
}
if msgR != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
@@ -259,8 +254,46 @@
return ms.handle.Pub(nodes, pi)
}
+//璁㈤槄涓婚
func (ms *MicroNode) Subscribe(topics []string) {
ms.handle.Sub(topics)
+ for _,t := range topics {
+ if ms.reg.SubTopic == nil {
+ ms.reg.SubTopic = make([]string, 0)
+ }
+ found := false
+ for _,it := range ms.reg.SubTopic {
+ if it == t {
+ found = true
+ break
+ }
+ }
+ if !found {
+ ms.reg.SubTopic = append(ms.reg.SubTopic, t)
+ }
+ }
+}
+
+//鍙栨秷璁㈤槄鐨勪富棰�
+func (ms *MicroNode) DeSub(topics []string) {
+ ms.printLog("DeSub topics:", topics)
+ ms.handle.DeSub(topics)
+ if ms.reg.SubTopic != nil {
+ var leftTopics []string
+ for _,t := range ms.reg.SubTopic {
+ found := false
+ for _,it := range topics {
+ if it == t {
+ found = true
+ break
+ }
+ }
+ if !found {
+ leftTopics = append(leftTopics, t)
+ }
+ }
+ ms.reg.SubTopic = leftTopics
+ }
}
//free handle
--
Gitblit v1.8.0