liuxiaolong
2020-12-29 913063a73b9ffb0d0a1cf3515cec9326102e86e1
添加请求注册中心的RequestCenter方法,以及获取本机所有已注册进程列表
3个文件已修改
60 ■■■■■ 已修改文件
hbusc.go 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
message.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go
@@ -441,6 +441,29 @@
    return nil
}
func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) {
    data, err := json.Marshal(*req)
    if err != nil {
        return nil, err
    }
    rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
        Key: KEY_QUERY,
    })
    h.mtxWorker.Lock()
    defer h.mtxWorker.Unlock()
    var ret []bhomebus.Mesg
    if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut);n == 0 {
        if len(ret) > 0 {
            var cr *CommonReply
            if err = json.Unmarshal(ret[0].Data, cr); err == nil {
                return cr, nil
            }
        }
    }
    return nil, fmt.Errorf("request center err")
}
//向主题通道中发布消息
func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
    data,err := json.Marshal(*msg)
message.go
@@ -4,7 +4,7 @@
    PORT_DEFAULTPROXY int = 5000
    KEY_REGISTER      int = 101 //处理TOPIC_REGISTER / TOPIC_LEAVE和TOPIC_UPDATETOPIC
    KEY_HEARTBEAT     int = 102 //处理TOPIC_HEARTBEAT
    KEY_QUERY         int = 103 //处理TOPIC_QUERYKEY和TOPIC_QUERYTOPIC
    KEY_QUERY         int = 103 //处理TOPIC_QUERYKEY和TOPIC_QUERYTOPIC和TOPIC_QUERYPROC
)
const (
@@ -16,6 +16,7 @@
    TOPIC_UPDATETOPIC        string = "Topic_UpdateTopic"
    TOPIC_QUERYKEY           string = "Topic_QueryKey"
    TOPIC_QUERYTOPIC         string = "Topic_QueryTopic"
    TOPIC_QUERYPROC          string = "Topic_QueryProc"
)
const (
@@ -105,3 +106,13 @@
    Topic2Key         map[string]int         `json:"topic2Key"`      //topic-replyKey的对应关系
    Status            int                    `json:"status"`            //节点状态
}
//已注册的Proc进程信息
type RegisteredClient struct {
    Info           RegisterInfo         `json:"info"`
    ReplyKey       int                    `json:"replyKey"`
    HeartbeatCount int                    `json:"heartbeatCount"`
    DeadCount      int                    `json:"deadCount"`
    Status         int                    `json:"status"`
}
micronode.go
@@ -191,6 +191,30 @@
    return netNodes
}
func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
    r := MsgInfo{
        SrcProc: *ms.procInfo,
        MsgType: MesgType_ReqRep,
        Topic: TOPIC_QUERYPROC,
    }
    cr, err := ms.handle.RequestCenter(&r)
    if err != nil {
        return nil, err
    }
    if cr.Status == REPLY_SUCCESS && cr.Body != nil {
        var list []RegisteredClient
        err = json.Unmarshal(cr.Body, &list)
        if err == nil {
            return list, nil
        } else {
            ms.printLog("unmarshal to RegisteredClient list err:", err)
        }
    } else {
        ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc)
    }
    return nil, fmt.Errorf("GetRegisteredClient list failed")
}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
    var reqBody Request
    err := json.Unmarshal(msgR.Body, &reqBody)