liuxiaolong
2021-04-02 a35153875f213929601a39c47f0823b310210321
MicroFunc添加Context参数
2个文件已修改
81 ■■■■■ 已修改文件
micronode.go 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
requestTopic.go 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go
@@ -7,6 +7,7 @@
    "errors"
    "fmt"
    "os"
    "sync"
    "time"
)
@@ -20,6 +21,9 @@
    fnLog         func(...interface{})
    SubCh         chan *MsgInfo
    mtx         sync.Mutex
    started     bool
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
@@ -83,41 +87,51 @@
}
func (ms *MicroNode) StartClient() {
    go ms.startHeartbeat()
    ms.mtx.Lock()
    defer ms.mtx.Unlock()
    if !ms.started {
        ms.started = true
        go ms.startHeartbeat()
    }
}
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
    ms.handlers = funcMap
    ms.mtx.Lock()
    if !ms.started {
        ms.started = true
        ms.mtx.Unlock()
    go ms.startHeartbeat()
        ms.handlers = funcMap
    for {
        select {
        case <- ms.ctx.Done():
            return
        default:
            msgS, msgR, keyR := ms.handle.GetMsg()
            if msgS != nil {
                //收到其它进程的发布消息
                ms.printLog("Recv Sub Message:", string(msgS.Body))
                ms.SubCh <- msgS
        go ms.startHeartbeat()
        for {
            select {
            case <- ms.ctx.Done():
                return
            default:
                msgS, msgR, keyR := ms.handle.GetMsg()
                if msgS != nil {
                    //收到其它进程的发布消息
                    ms.printLog("Recv Sub Message:", string(msgS.Body))
                    ms.SubCh <- msgS
                }
                if msgR != nil {
                    //收到其它进程的请求消息
                    go ms.serve(msgR, keyR)
                }
                time.Sleep(50 * time.Millisecond)
            }
            if msgR != nil {
                //收到其它进程的请求消息
                go ms.serve(msgR, keyR)
            }
            time.Sleep(50 * time.Millisecond)
        }
        //接收订阅到的消息
        //go ms.startRecvSubMsg()
        //作为server启动
        //ms.serve()
    }
    //接收订阅到的消息
    //go ms.startRecvSubMsg()
    //作为server启动
    //ms.serve()
    ms.mtx.Unlock()
}
//开始接收订阅消息
@@ -284,7 +298,11 @@
        if f,ok := ms.handlers[reqBody.Path];ok {
            reqBody.SrcProc = msgR.SrcProc
            ri = f(&reqBody)
            ctx := Context{
                ms,
                ms,
            }
            ri = f(&ctx, &reqBody)
            ms.printLog("call funcMap f,reply.Success:", ri.Success)
        } else {
            ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
requestTopic.go
@@ -1,6 +1,11 @@
package bhomeclient
type MicroFunc func(req *Request) *Reply
type Context struct {
    Bk         Broker
    Tr         Transport
}
type MicroFunc func(ctx *Context, req *Request) *Reply
type Transport interface {