liuxiaolong
2021-01-08 986c949195d1f50758602f198cae76d56c3b8d56
如果没有pubTopics或者subTopic,则不启动server接收
2个文件已修改
58 ■■■■■ 已修改文件
hbusc.go 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go
@@ -192,24 +192,27 @@
        }
    }
    handle.wg = &sync.WaitGroup{}
    sockReply := bhomebus.OpenSocket()
    sockReply.ForceBind(int(regR.ReplyKey))
    handle.wg.Add(1)
    //serve server reply
    go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
    handle.sockRep = &sockServer{
        sock: sockReply,
        info: &ri.Proc,
    }
    //维持心跳的socket
    sockHB := bhomebus.OpenSocket()
    handle.sockHB = &sockClient{
        sock: sockHB,
        peer: int(regR.HeartbeatKey),
    }
    handle.wg = &sync.WaitGroup{}
    if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
        sockReply := bhomebus.OpenSocket()
        sockReply.ForceBind(int(regR.ReplyKey))
        handle.wg.Add(1)
        //serve server reply
        go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
        handle.sockRep = &sockServer{
            sock: sockReply,
            info: &ri.Proc,
        }
    }
    //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key
    sockPub := bhomebus.OpenSocket()
@@ -218,19 +221,22 @@
        peer: -1,
    }
    //订阅消息的socket
    sockSub := bhomebus.OpenSocket()
    //订阅所有主题
    for _,v := range ri.SubTopic {
        sockSub.Sub(v)
    }
    //有订阅消息才需要启动协程接收消息
    if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
        //订阅消息的socket
        sockSub := bhomebus.OpenSocket()
        //订阅所有主题
        for _,v := range ri.SubTopic {
            sockSub.Sub(v)
        }
    //启动订阅信息接收
    handle.wg.Add(1)
    go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
    handle.sockSub = &sockClient{
        sock: sockSub,
        peer: -1,
        //启动订阅信息接收
        handle.wg.Add(1)
        go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
        handle.sockSub = &sockClient{
            sock: sockSub,
            peer: -1,
        }
    }
    sockWorker := bhomebus.OpenSocket()
micronode.go
@@ -22,7 +22,7 @@
    SubCh         chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
    conf := NewConfig(KEY_REGISTER,512,5,100,100,100, fnLog)
    handle, err := Register(ctx, q, conf, reg)
    if err != nil {
@@ -33,7 +33,7 @@
        serverId: serverId,
        handle:   handle,
        reg:      reg,
        procInfo: procInfo,
        procInfo: &reg.Proc,
        fnLog:    fnLog,
        SubCh:    make(chan *MsgInfo, 512),
    }