在库里使用OpenSocket会导致Ctrl-C退出时共享内存无法释放,改为调用者传入doReq
2个文件已修改
137 ■■■■ 已修改文件
client.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sbusClient.go 126 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client.go
@@ -2,6 +2,7 @@
import (
    "basic.com/valib/bhomebus.git"
    "errors"
    "fmt"
    "strconv"
)
@@ -53,6 +54,16 @@
    }
}
var busReq = func(req []byte,nodes []bhomebus.NetNode) ([]byte, error) {
    return nil, errors.New("please init InitDoReq first")
}
func InitDoReq(fn func([]byte, []bhomebus.NetNode) ([]byte, error)) {
    if fn != nil {
        busReq = fn
    }
}
func url2Topic(serviceName string, url string) string {
    return url
}
sbusClient.go
@@ -4,7 +4,6 @@
    "basic.com/valib/bhomebus.git"
    "encoding/json"
    "errors"
    "fmt"
    "strconv"
)
@@ -25,7 +24,7 @@
    Body             []byte                `json:"body"`            //请求内容或者反馈结果
}
type request struct {
type Request struct {
    Path                    string                  `json:"path"`
    Method                  string                  `json:"method"`
    ContentType             string                  `json:"contentType"`
@@ -47,21 +46,35 @@
        return nil, errors.New("invalid netNodes")
    }
    req := request {
    req := Request{
        Path:        url,
        Method:      "GET",
        ContentType: CONTENT_TYPE_JSON,
    }
    fillParam(&req, headers, params, nil)
    return doReq(req, sc.nodes)
    rb, err := json.Marshal(req)
    if err !=nil {
        return nil,err
    }
    rMsg := MsgInfo{
        Topic: req.Path,
        Body: rb,
    }
    rData, err := json.Marshal(rMsg)
    if err != nil {
        return nil, err
    }
    return busReq(rData, sc.nodes)
    //return doReq(req, sc.nodes)
}
func (sc SBusClient) DoPostRequest(url string, contentType string, body map[string]interface{}, params map[string]string, headers map[string]string) ([]byte, error) {
    if sc.nodes == nil || len(sc.nodes) == 0 {
        return nil, errors.New("invalid port")
    }
    req := request {
    req := Request{
        Path:        url,
        Method:      "POST",
        ContentType: contentType,
@@ -96,38 +109,77 @@
        }
    }
    return doReq(req, sc.nodes)
    rb, err := json.Marshal(req)
    if err !=nil {
        return nil,err
    }
    rMsg := MsgInfo{
        Topic: req.Path,
        Body: rb,
    }
    rData, err := json.Marshal(rMsg)
    if err != nil {
        return nil, err
    }
    return busReq(rData, sc.nodes)
}
func (sc SBusClient) DoPutRequest(url string, contentType string, body map[string]interface{}, headers map[string]string) ([]byte, error) {
    if sc.nodes == nil || len(sc.nodes) == 0 {
        return nil, errors.New("invalid port")
    }
    req := request {
    req := Request{
        Path:        url,
        Method:      "PUT",
        ContentType: contentType,
    }
    fillParam(&req, headers, nil, body)
    return doReq(req, sc.nodes)
    rb, err := json.Marshal(req)
    if err !=nil {
        return nil,err
    }
    rMsg := MsgInfo{
        Topic: req.Path,
        Body: rb,
    }
    rData, err := json.Marshal(rMsg)
    if err != nil {
        return nil, err
    }
    return busReq(rData, sc.nodes)
}
func (sc SBusClient) DoDeleteRequest(url string, contentType string, body map[string]interface{}, headers map[string]string) ([]byte, error) {
    if sc.nodes == nil || len(sc.nodes) == 0 {
        return nil, errors.New("invalid port")
    }
    req := request {
    req := Request{
        Path:        url,
        Method:      "DELETE",
        ContentType: contentType,
    }
    fillParam(&req, headers, nil, body)
    return doReq(req, sc.nodes)
    rb, err := json.Marshal(req)
    if err !=nil {
        return nil,err
    }
    rMsg := MsgInfo{
        Topic: req.Path,
        Body: rb,
    }
    rData, err := json.Marshal(rMsg)
    if err != nil {
        return nil, err
    }
    return busReq(rData, sc.nodes)
}
func fillParam(req *request,headers map[string]string, params map[string]string, body map[string]interface{}) {
func fillParam(req *Request,headers map[string]string, params map[string]string, body map[string]interface{}) {
    headerMap := make(map[string][]string)
    queryMap := make(map[string][]string)
    if headers != nil {
@@ -151,28 +203,30 @@
}
func doReq(req request, nodes []bhomebus.NetNode) ([]byte,error) {
    rb, err := json.Marshal(req)
    if err !=nil {
        return nil,err
    }
    rMsg := MsgInfo{
        Topic: req.Path,
        Body: rb,
    }
    data, err := json.Marshal(rMsg)
    if err != nil {
        return nil, err
    }
    s := bhomebus.OpenSocket()
    defer s.Close()
    var ret []bhomebus.Mesg
    if n := s.SendandrecvTimeout(nodes, data, &ret, 5000);n == 0 {  //n==0表示没有请求成功
        return nil, fmt.Errorf("doReq s.SendandrecvTimeout result n:%d", n)
    } else {
        if len(ret) > 0 {
            return ret[0].Data, nil
        }
        return nil, fmt.Errorf("no any response")
    }
}
//在此处使用OpenSocket会在Ctrl-C的时候,导致socket并未成功Close,共享内存块不会释放.
//所以控制共享内存块的成功回收需要在上层做,然后调InitDoReq将函数指针传递进来
//func doReq(req Request, nodes []bhomebus.NetNode) ([]byte,error) {
//    rb, err := json.Marshal(req)
//    if err !=nil {
//        return nil,err
//    }
//    rMsg := MsgInfo{
//        Topic: req.Path,
//        Body: rb,
//    }
//    data, err := json.Marshal(rMsg)
//    if err != nil {
//        return nil, err
//    }
//    s := bhomebus.OpenSocket()
//    defer s.Close()
//    var ret []bhomebus.Mesg
//    if n := s.SendandrecvTimeout(nodes, data, &ret, 5000);n == 0 {  //n==0表示没有请求成功
//        return nil, fmt.Errorf("doReq s.SendandrecvTimeout result n:%d", n)
//    } else {
//        if len(ret) > 0 {
//            return ret[0].Data, nil
//        }
//        return nil, fmt.Errorf("no any response")
//    }
//}