添加请求注册中心的RequestCenter方法,以及获取本机所有已注册进程列表
| | |
| | | return nil |
| | | } |
| | | |
| | | func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) { |
| | | data, err := json.Marshal(*req) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: KEY_QUERY, |
| | | }) |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | var ret []bhomebus.Mesg |
| | | if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut);n == 0 { |
| | | if len(ret) > 0 { |
| | | var cr *CommonReply |
| | | if err = json.Unmarshal(ret[0].Data, cr); err == nil { |
| | | return cr, nil |
| | | } |
| | | } |
| | | } |
| | | return nil, fmt.Errorf("request center err") |
| | | } |
| | | |
| | | |
| | | //向主题通道中发布消息 |
| | | func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error { |
| | | data,err := json.Marshal(*msg) |
| | |
| | | PORT_DEFAULTPROXY int = 5000 |
| | | KEY_REGISTER int = 101 //处理TOPIC_REGISTER / TOPIC_LEAVE和TOPIC_UPDATETOPIC |
| | | KEY_HEARTBEAT int = 102 //处理TOPIC_HEARTBEAT |
| | | KEY_QUERY int = 103 //处理TOPIC_QUERYKEY和TOPIC_QUERYTOPIC |
| | | KEY_QUERY int = 103 //处理TOPIC_QUERYKEY和TOPIC_QUERYTOPIC和TOPIC_QUERYPROC |
| | | ) |
| | | |
| | | const ( |
| | |
| | | TOPIC_UPDATETOPIC string = "Topic_UpdateTopic" |
| | | TOPIC_QUERYKEY string = "Topic_QueryKey" |
| | | TOPIC_QUERYTOPIC string = "Topic_QueryTopic" |
| | | TOPIC_QUERYPROC string = "Topic_QueryProc" |
| | | ) |
| | | |
| | | const ( |
| | |
| | | Topic2Key map[string]int `json:"topic2Key"` //topic-replyKey的对应关系 |
| | | Status int `json:"status"` //节点状态 |
| | | } |
| | | |
| | | |
| | | //已注册的Proc进程信息 |
| | | type RegisteredClient struct { |
| | | Info RegisterInfo `json:"info"` |
| | | ReplyKey int `json:"replyKey"` |
| | | HeartbeatCount int `json:"heartbeatCount"` |
| | | DeadCount int `json:"deadCount"` |
| | | Status int `json:"status"` |
| | | } |
| | |
| | | return netNodes |
| | | } |
| | | |
| | | func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) { |
| | | r := MsgInfo{ |
| | | SrcProc: *ms.procInfo, |
| | | MsgType: MesgType_ReqRep, |
| | | Topic: TOPIC_QUERYPROC, |
| | | } |
| | | cr, err := ms.handle.RequestCenter(&r) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | if cr.Status == REPLY_SUCCESS && cr.Body != nil { |
| | | var list []RegisteredClient |
| | | err = json.Unmarshal(cr.Body, &list) |
| | | if err == nil { |
| | | return list, nil |
| | | } else { |
| | | ms.printLog("unmarshal to RegisteredClient list err:", err) |
| | | } |
| | | } else { |
| | | ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc) |
| | | } |
| | | return nil, fmt.Errorf("GetRegisteredClient list failed") |
| | | } |
| | | |
| | | func (ms *MicroNode) serve(msgR *MsgInfo, p int) { |
| | | var reqBody Request |
| | | err := json.Unmarshal(msgR.Body, &reqBody) |