liuxiaolong
2021-02-05 81afb6ffbf7f76f49644a1832dcfe241552d7e08
add recvandsend
2个文件已修改
140 ■■■■■ 已修改文件
hbusc.go 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go
@@ -7,7 +7,6 @@
    "errors"
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
)
@@ -51,7 +50,6 @@
    //mtxWorker     sync.Mutex     //SendAndRecv可能不是线程安全的
    chSub chan TransInfo
    chReply chan TransInfo
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
@@ -86,13 +84,31 @@
    }
}
func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            logFn("recvandsendRoutine ctx.Done")
            wg.Done()
            return
        default:
            n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
            if n != 0 {
                logFn("RecvandsendTimeout success")
            } else {
                //time.Sleep(100 * time.Millisecond)
            }
        }
    }
}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
    handle := &BHBus{
    handle := &BHBus {
        ctx: ctx,
        conf: config,
        m: make(map[string]*sockServer),
        chSub: make(chan TransInfo, config.chSize),
        chReply: make(chan TransInfo, config.chSize),
    }
    var err error
@@ -211,9 +227,9 @@
        sockReply := bhomebus.OpenSocket()
        sockReply.ForceBind(int(regR.ReplyKey))
        handle.printLog("after pubTopic forceBind")
        handle.wg.Add(1)
        //handle.wg.Add(1)
        //serve server reply
        go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
        //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
        handle.sockRep = &sockServer{
            sock: sockReply,
            info: &ri.Proc,
@@ -468,19 +484,19 @@
    return nil, fmt.Errorf("request err")
}
func (h *BHBus) Reply(replyKey int, i *Reply) error {
    data,err := json.Marshal(*i)
    if err != nil {
        return err
    }
    n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
    h.printLog("reply to key:", replyKey, " n:",n)
    if n != 0 {
        return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
    }
    return nil
}
//func (h *BHBus) Reply(replyKey int, i *Reply) error {
//    data,err := json.Marshal(*i)
//    if err != nil {
//        return err
//    }
//
//    n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
//    h.printLog("reply to key:", replyKey, " n:",n)
//    if n != 0 {
//        return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
//    }
//    return nil
//}
//只发送请求,不需要应答.
@@ -568,18 +584,13 @@
//获取sub 或者需要reply的消息
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
    if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
        return nil,nil, -1
func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
    if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
        return nil
    }
    if len(h.chSub) >0 {
        m  := <-h.chSub
        subMsg = m.info
    }
    if len(h.chReply) >0 {
        m := <-h.chReply
        replyMsg = m.info
        replyKey = m.port
    }
    return
}
micronode.go
@@ -90,21 +90,24 @@
    ms.handlers = funcMap
    go ms.startHeartbeat()
    //接收订阅到的消息
    go ms.startRecvSubMsg()
    //作为server启动
    ms.serve()
}
//开始接收订阅消息
func (ms *MicroNode) startRecvSubMsg() {
    for {
        select {
        case <- ms.ctx.Done():
            return
        default:
            msgS, msgR, keyR := ms.handle.GetMsg()
            msgS := ms.handle.GetMsg()
            if msgS != nil {
                //收到其它进程的发布消息
                ms.printLog("Recv Sub Message:", string(msgS.Body))
                ms.SubCh <- msgS
            }
            if msgR != nil {
                //收到其它进程的请求消息
                go ms.serve(msgR, keyR)
            }
            time.Sleep(50 * time.Millisecond)
@@ -189,39 +192,49 @@
    return nil, fmt.Errorf("GetRegisteredClient list failed")
}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
    ri := &Reply{}
    if ms.handlers == nil {
        return
    }
    var reqBody Request
    var ri *Reply
    err := json.Unmarshal(msgR.Body, &reqBody)
    if err != nil {
        ms.printLog("serve unmarshal msgR.Body err:", err)
        ri = &Reply {
            Msg: err.Error(),
        }
        ri.Msg = "send wrong addr, check yourself!!!"
    } else {
        ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
        if f,ok := ms.handlers[reqBody.Path];ok {
            reqBody.SrcProc = msgR.SrcProc
            ri = f(&reqBody)
            ms.printLog("call funcMap f,reply:", *ri)
        var msgR MsgInfo
        err := json.Unmarshal(rdata, &msgR)
        if err != nil {
            ri.Msg = err.Error()
        } else {
            ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
            ri = &Reply{
                Success: false,
                Msg: "请求的接口不存在,请检查url",
                Data: "请求的接口不存在,请检查url",
            var reqBody Request
            err = json.Unmarshal(rdata, &msgR.Body)
            if err != nil {
                ri.Msg = err.Error()
            } else {
                ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
                if f,ok := ms.handlers[reqBody.Path];ok {
                    reqBody.SrcProc = msgR.SrcProc
                    ri = f(&reqBody)
                    ms.printLog("call funcMap f,reply:", *ri)
                } else {
                    ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
                    ri.Msg = "请求的接口不存在,请检查url"
                }
            }
        }
    }
    result, err := json.Marshal(*ri)
    if err != nil {
        sdata = nil
    } else {
        sdata = &result
    }
    return ri.Success
}
    retErr := ms.handle.Reply(p, ri)
    if retErr != nil {
        ms.printLog("retErr:", retErr)
func (ms *MicroNode) serve() {
    if ms.handlers == nil {
        return
    }
    for i:=0;i<10;i++ {
        ms.handle.wg.Add(1)
        go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
    }
}