liuxiaolong
2021-02-07 6f7957c42c409624ca7a05c54bd35752f996ba68
rm recvandsend
2个文件已修改
248 ■■■■■ 已修改文件
hbusc.go 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 158 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go
@@ -7,6 +7,7 @@
    "errors"
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
)
@@ -50,6 +51,7 @@
    //mtxWorker     sync.Mutex     //SendAndRecv可能不是线程安全的
    chSub chan TransInfo
    chReply chan TransInfo
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
@@ -84,23 +86,23 @@
    }
}
func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            logFn("recvandsendRoutine ctx.Done")
            wg.Done()
            return
        default:
            n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
            if n != 0 {
                logFn("RecvandsendTimeout success")
            } else {
                //time.Sleep(100 * time.Millisecond)
            }
        }
    }
}
//func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
//    for {
//        select {
//        case <-ctx.Done():
//            logFn("recvandsendRoutine ctx.Done")
//            wg.Done()
//            return
//        default:
//            n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
//            if n != 0 {
//                logFn("RecvandsendTimeout success")
//            } else {
//                //time.Sleep(100 * time.Millisecond)
//            }
//        }
//    }
//}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
@@ -109,6 +111,7 @@
        conf: config,
        m: make(map[string]*sockServer),
        chSub: make(chan TransInfo, config.chSize),
        chReply: make(chan TransInfo, config.chSize),
    }
    var err error
@@ -227,9 +230,9 @@
        sockReply := bhomebus.OpenSocket()
        sockReply.ForceBind(int(regR.ReplyKey))
        handle.printLog("after pubTopic forceBind")
        //handle.wg.Add(1)
        handle.wg.Add(1)
        //serve server reply
        //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
        go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
        handle.sockRep = &sockServer{
            sock: sockReply,
            info: &ri.Proc,
@@ -484,19 +487,19 @@
    return nil, fmt.Errorf("request err")
}
//func (h *BHBus) Reply(replyKey int, i *Reply) error {
//    data,err := json.Marshal(*i)
//    if err != nil {
//        return err
//    }
//
//    n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
//    h.printLog("reply to key:", replyKey, " n:",n)
//    if n != 0 {
//        return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
//    }
//    return nil
//}
func (h *BHBus) Reply(replyKey int, i *Reply) error {
    data,err := json.Marshal(*i)
    if err != nil {
        return err
    }
    n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
    h.printLog("reply to key:", replyKey, " n:",n)
    if n != 0 {
        return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
    }
    return nil
}
//只发送请求,不需要应答.
@@ -584,13 +587,30 @@
//获取sub 或者需要reply的消息
func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
    if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
        return nil
//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
//    if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
//        return nil
//    }
//    if len(h.chSub) >0 {
//        m  := <-h.chSub
//        subMsg = m.info
//    }
//    return
//}
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
    if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
        return nil, nil, -1
    }
    if len(h.chSub) >0 {
        m  := <-h.chSub
        subMsg = m.info
    }
    if len(h.chReply) >0 {
        m := <-h.chReply
        replyMsg = m.info
        replyKey = m.port
    }
    return
}
micronode.go
@@ -90,30 +90,58 @@
    ms.handlers = funcMap
    go ms.startHeartbeat()
    //接收订阅到的消息
    go ms.startRecvSubMsg()
    //作为server启动
    ms.serve()
}
//开始接收订阅消息
func (ms *MicroNode) startRecvSubMsg() {
    for {
        select {
        case <- ms.ctx.Done():
            return
        default:
            msgS := ms.handle.GetMsg()
            msgS, msgR, keyR := ms.handle.GetMsg()
            if msgS != nil {
                //收到其它进程的发布消息
                ms.printLog("Recv Sub Message:", string(msgS.Body))
                ms.SubCh <- msgS
            }
            if msgR != nil {
                //收到其它进程的请求消息
                go ms.serve(msgR, keyR)
            }
            time.Sleep(50 * time.Millisecond)
        }
    }
    //接收订阅到的消息
    //go ms.startRecvSubMsg()
    //作为server启动
    //ms.serve()
}
//开始接收订阅消息
//func (ms *MicroNode) startRecvSubMsg() {
//    for {
//        select {
//        case <- ms.ctx.Done():
//            return
//        default:
//            msgS, msgR, keyR := ms.handle.GetMsg()
//            if msgS != nil {
//                //收到其它进程的发布消息
//                ms.printLog("Recv Sub Message:", string(msgS.Body))
//                ms.SubCh <- msgS
//            }
//            if msgR != nil {
//                //收到其它进程的请求消息
//                go ms.serve(msgR, keyR)
//            }
//
//            time.Sleep(50 * time.Millisecond)
//        }
//    }
//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
    t := time.Now()
@@ -192,49 +220,85 @@
    return nil, fmt.Errorf("GetRegisteredClient list failed")
}
func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
    ri := &Reply{}
    if ms.handlers == nil {
        ri.Msg = "send wrong addr, check yourself!!!"
    } else {
        var msgR MsgInfo
        err := json.Unmarshal(rdata, &msgR)
        if err != nil {
            ri.Msg = err.Error()
        } else {
            var reqBody Request
            err = json.Unmarshal(rdata, &msgR.Body)
            if err != nil {
                ri.Msg = err.Error()
            } else {
                ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
                if f,ok := ms.handlers[reqBody.Path];ok {
                    reqBody.SrcProc = msgR.SrcProc
                    ri = f(&reqBody)
                    ms.printLog("call funcMap f,reply:", *ri)
                } else {
                    ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
                    ri.Msg = "请求的接口不存在,请检查url"
                }
            }
        }
    }
    result, err := json.Marshal(*ri)
    if err != nil {
        sdata = nil
    } else {
        sdata = &result
    }
    return ri.Success
}
//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
//    ri := &Reply{}
//    if ms.handlers == nil {
//        ri.Msg = "send wrong addr, check yourself!!!"
//    } else {
//        var msgR MsgInfo
//        err := json.Unmarshal(rdata, &msgR)
//        if err != nil {
//            ri.Msg = err.Error()
//        } else {
//            var reqBody Request
//            err = json.Unmarshal(rdata, &msgR.Body)
//            if err != nil {
//                ri.Msg = err.Error()
//            } else {
//                ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
//                if f,ok := ms.handlers[reqBody.Path];ok {
//                    reqBody.SrcProc = msgR.SrcProc
//                    ri = f(&reqBody)
//                    ms.printLog("call funcMap f,reply:", *ri)
//                } else {
//                    ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
//                    ri.Msg = "请求的接口不存在,请检查url"
//                }
//            }
//        }
//    }
//    result, err := json.Marshal(*ri)
//    if err != nil {
//        sdata = nil
//    } else {
//        sdata = &result
//    }
//    return ri.Success
//}
func (ms *MicroNode) serve() {
//func (ms *MicroNode) serve() {
//    if ms.handlers == nil {
//        return
//    }
//    for i:=0;i<10;i++ {
//        ms.handle.wg.Add(1)
//        go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
//    }
//}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
    if ms.handlers == nil {
        return
    }
    for i:=0;i<10;i++ {
        ms.handle.wg.Add(1)
        go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
    var reqBody Request
    var ri *Reply
    err := json.Unmarshal(msgR.Body, &reqBody)
    if err != nil {
        ms.printLog("serve unmarshal msgR.Body err:", err)
        ri = &Reply {
            Msg: err.Error(),
        }
    } else {
        ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
        if f,ok := ms.handlers[reqBody.Path];ok {
            reqBody.SrcProc = msgR.SrcProc
            ri = f(&reqBody)
            ms.printLog("call funcMap f,reply:", *ri)
        } else {
            ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
            ri = &Reply{
                Success: false,
                Msg: "请求的接口不存在,请检查url",
                Data: "请求的接口不存在,请检查url",
            }
        }
    }
    retErr := ms.handle.Reply(p, ri)
    if retErr != nil {
        ms.printLog("retErr:", retErr)
    }
}