liuxiaolong
2020-12-30 b93b67a7e237cbf59569dcaacfa84257856ba16e
添加DeSub,优化订阅主题消息通道
3个文件已修改
66 ■■■■ 已修改文件
broker.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 51 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
broker.go
@@ -9,5 +9,9 @@
    //发布到远程机器
    PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
    //订阅一些主题,可动态新增
    Subscribe(topics []string)
    //注销订阅的主题
    DeSub(topics []string)
}
hbusc.go
@@ -487,6 +487,17 @@
    }
}
//注销订阅的主题
func (h *BHBus) DeSub(topics []string) {
    if topics != nil {
        for _,t := range topics {
            if n := h.sockSub.sock.Desub(t); n != 0 {
                h.printLog("DeSub topic:", t, " n:", n)
            }
        }
    }
}
//获取sub 或者需要reply的消息
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
micronode.go
@@ -19,7 +19,7 @@
    serverId     string
    fnLog         func(...interface{})
    SubChM         map[string]chan *MsgInfo //以订阅的主题为key
    SubCh         chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
@@ -33,11 +33,8 @@
        handle:   handle,
        reg:      reg,
        procInfo: procInfo,
        fnLog: fnLog,
        SubChM:   make(map[string]chan *MsgInfo),
    }
    for _,subTopic := range reg.SubTopic {
        mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
        fnLog:    fnLog,
        SubCh:    make(chan *MsgInfo, 512),
    }
    return mn, nil
@@ -102,9 +99,7 @@
            if msgS != nil {
                //收到其它进程的发布消息
                ms.printLog("Recv Sub Message:", string(msgS.Body))
                if ch,ok := ms.SubChM[msgS.Topic];ok {
                    ch <- msgS
                }
                ms.SubCh <- msgS
            }
            if msgR != nil {
                //收到其它进程的请求消息
@@ -259,8 +254,46 @@
    return ms.handle.Pub(nodes, pi)
}
//订阅主题
func (ms *MicroNode) Subscribe(topics []string) {
    ms.handle.Sub(topics)
    for _,t := range topics {
        if ms.reg.SubTopic == nil {
            ms.reg.SubTopic = make([]string, 0)
        }
        found := false
        for _,it := range ms.reg.SubTopic {
            if it == t {
                found = true
                break
            }
        }
        if !found {
            ms.reg.SubTopic = append(ms.reg.SubTopic, t)
        }
    }
}
//取消订阅的主题
func (ms *MicroNode) DeSub(topics []string) {
    ms.printLog("DeSub topics:", topics)
    ms.handle.DeSub(topics)
    if ms.reg.SubTopic != nil {
        var leftTopics []string
        for _,t := range ms.reg.SubTopic {
            found := false
            for _,it := range topics {
                if it == t {
                    found = true
                    break
                }
            }
            if !found {
                leftTopics = append(leftTopics, t)
            }
        }
        ms.reg.SubTopic = leftTopics
    }
}
//free handle