去掉CommonReply,统一使用Reply结构,减少消息解封装
| | |
| | | var rMsg []bhomebus.Mesg |
| | | n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n代表成功发送的节点的个数 |
| | | if n == 1 && len(rMsg) == 1 { |
| | | var cr CommonReply |
| | | var cr Reply |
| | | if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil { |
| | | handle.printLog("unmarshal regReply err:", err) |
| | | return nil, errors.New("unmarshal regReply err:"+err.Error()) |
| | | } else { |
| | | if cr.Status == REPLY_SUCCESS { |
| | | var rr RegisterReply |
| | | if err = json.Unmarshal(cr.Body, &rr);err ==nil { |
| | | regR = &rr |
| | | break loop |
| | | if cr.Success { |
| | | if rpd,err := json.Marshal(cr.Data);err ==nil { |
| | | var rr RegisterReply |
| | | if err = json.Unmarshal(rpd, &rr); err == nil { |
| | | regR = &rr |
| | | break loop |
| | | } else { |
| | | handle.printLog("unmarshal RegisterReply err:", err) |
| | | } |
| | | } else { |
| | | handle.printLog("unmarshal RegisterReply err:", err) |
| | | handle.printLog("marshal cr.Data err:", err) |
| | | } |
| | | |
| | | } else { |
| | | handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc) |
| | | handle.printLog("cr:", cr) |
| | | } |
| | | |
| | | } |
| | |
| | | var ret []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) |
| | | if n > 0 { |
| | | var reply CommonReply |
| | | var reply Reply |
| | | err = json.Unmarshal(ret[0].Data, &reply) |
| | | if err != nil { |
| | | h.printLog("unmarshal err:", err) |
| | | return nil, err |
| | | } |
| | | |
| | | if reply.Status == REPLY_SUCCESS { |
| | | err = json.Unmarshal(reply.Body, &nodes) |
| | | if reply.Success { |
| | | rd,err := json.Marshal(reply.Data) |
| | | if err == nil { |
| | | return nodes, nil |
| | | err = json.Unmarshal(rd, &nodes) |
| | | if err == nil { |
| | | return nodes, nil |
| | | } else { |
| | | h.printLog("unmarshal err:", err, "nodes:", nodes) |
| | | return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error()) |
| | | } |
| | | } else { |
| | | h.printLog("unmarshal err:", err, "nodes:", nodes) |
| | | return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error()) |
| | | return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) |
| | | } |
| | | |
| | | } else { |
| | | h.printLog("reply status:", reply.Status, "desc:", reply.Desc, "body:", string(reply.Body)) |
| | | return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status) |
| | | h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data) |
| | | return nil, fmt.Errorf("REPLY msg:%s", reply.Msg) |
| | | } |
| | | } else { |
| | | return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n) |
| | | } |
| | | } |
| | | |
| | | func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*MsgInfo, error) { |
| | | func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) { |
| | | //1.首先需要通过topic拿到本机对应的NetNode |
| | | rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) |
| | | h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err) |
| | |
| | | h.printLog("Request n: ", n, " len(ret): ", len(ret)) |
| | | |
| | | if n > 0 && len(ret) > 0 { |
| | | var resp MsgInfo |
| | | var resp Reply |
| | | if err = json.Unmarshal(ret[0].Data, &resp); err == nil { |
| | | return &resp, nil |
| | | } else { |
| | |
| | | return nil, fmt.Errorf("request err") |
| | | } |
| | | |
| | | func (h *BHBus) Reply(replyKey int, i MsgInfo) error { |
| | | data,err := json.Marshal(i) |
| | | func (h *BHBus) Reply(replyKey int, i *Reply) error { |
| | | data,err := json.Marshal(*i) |
| | | if err != nil { |
| | | return err |
| | | } |
| | |
| | | return nil |
| | | } |
| | | |
| | | func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) { |
| | | func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) { |
| | | data, err := json.Marshal(*req) |
| | | if err != nil { |
| | | return nil, err |
| | |
| | | n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut) |
| | | h.printLog("requestCenter n:", n, "len(ret):", len(ret)) |
| | | if n > 0 && len(ret) > 0{ |
| | | var cr CommonReply |
| | | var cr Reply |
| | | if err = json.Unmarshal(ret[0].Data, &cr); err == nil { |
| | | return &cr, nil |
| | | } else { |
| | |
| | | TOPIC_QUERYPROC string = "Topic_QueryProc" |
| | | ) |
| | | |
| | | const ( |
| | | REPLY_SUCCESS int = 200 |
| | | REPLY_FAILED int = 202 |
| | | ) |
| | | type CommonReply struct { |
| | | Status int `json:"status"` // 请求状态,目前只有两个,成功返回200,失败202 |
| | | Desc string `json:"desc"` // 请求状态的描述,成功"success",失败返回失败原因,如心跳服务未启动 |
| | | Body []byte `json:"body"` // 返回值的具体内容,用户约定 |
| | | } |
| | | //const ( |
| | | // REPLY_SUCCESS int = 200 |
| | | // REPLY_FAILED int = 202 |
| | | //) |
| | | //type CommonReply struct { |
| | | // Status int `json:"status"` // 请求状态,目前只有两个,成功返回200,失败202 |
| | | // Desc string `json:"desc"` // 请求状态的描述,成功"success",失败返回失败原因,如心跳服务未启动 |
| | | // Body []byte `json:"body"` // 返回值的具体内容,用户约定 |
| | | //} |
| | | |
| | | const ( |
| | | NODE_ALIVE int = 0 |
| | |
| | | Body: rb, |
| | | } |
| | | ms.printLog("2:", time.Since(t)) |
| | | t = time.Now() |
| | | mi,err := ms.handle.Request(serverId, msgR, milliSecs) |
| | | if mi == nil || err != nil { |
| | | return nil, err |
| | | } |
| | | ms.printLog("3:", time.Since(t)) |
| | | t = time.Now() |
| | | ri := new(Reply) |
| | | err = json.Unmarshal(mi.Body, ri) |
| | | if err != nil { |
| | | ms.printLog("unmarshal mi.Body err:", err) |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "服务请求失败", |
| | | Data: "服务请求失败", |
| | | } |
| | | } |
| | | ms.printLog("4:", time.Since(t)) |
| | | return ri, nil |
| | | return ms.handle.Request(serverId, msgR, milliSecs) |
| | | } |
| | | |
| | | func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { |
| | |
| | | Body: rb, |
| | | } |
| | | |
| | | mi, err := ms.handle.Request(serverId, msgR, milliSecs) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var ri *Reply |
| | | err = json.Unmarshal(mi.Body, ri) |
| | | if err != nil { |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "服务请求失败", |
| | | Data: "服务请求失败", |
| | | } |
| | | } |
| | | return ri, nil |
| | | return ms.handle.Request(serverId, msgR, milliSecs) |
| | | } |
| | | |
| | | //获取本机中某一个主题的 key (结果只有一个元素) |
| | |
| | | ms.printLog("requestCenter reply:", cr, "err:", err) |
| | | return nil, err |
| | | } |
| | | if cr.Status == REPLY_SUCCESS && cr.Body != nil { |
| | | var list []RegisteredClient |
| | | err = json.Unmarshal(cr.Body, &list) |
| | | if cr.Success { |
| | | rd,err := json.Marshal(cr.Data) |
| | | if err == nil { |
| | | return list, nil |
| | | var list []RegisteredClient |
| | | err = json.Unmarshal(rd, &list) |
| | | if err == nil { |
| | | return list, nil |
| | | } else { |
| | | ms.printLog("unmarshal to RegisteredClient list err:", err) |
| | | } |
| | | } else { |
| | | ms.printLog("unmarshal to RegisteredClient list err:", err) |
| | | return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) |
| | | } |
| | | } else { |
| | | ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc) |
| | | ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data) |
| | | } |
| | | return nil, fmt.Errorf("GetRegisteredClient list failed") |
| | | } |
| | |
| | | Data: "请求的接口不存在,请检查url", |
| | | } |
| | | } |
| | | rd,err := json.Marshal(*ri) |
| | | if err != nil { |
| | | ms.printLog("marshal *ri err:", err) |
| | | } |
| | | rMsg := MsgInfo{ |
| | | Body: rd, |
| | | } |
| | | retErr := ms.handle.Reply(p, rMsg) |
| | | retErr := ms.handle.Reply(p, ri) |
| | | if retErr != nil { |
| | | ms.printLog("retErr:", retErr) |
| | | } |