zhangmeng
2023-12-06 6f73c447e25fa0b593f77338791051984990640c
remove for loop channel default
1个文件已修改
72 ■■■■ 已修改文件
micronode.go 72 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go
@@ -12,28 +12,28 @@
)
type MicroNode struct {
    ctx         context.Context
    handle         *BHBus
    reg         *RegisterInfo
    procInfo     *ProcInfo
    handlers     map[string]MicroFunc
    serverId     string
    fnLog         func(...interface{})
    ctx      context.Context
    handle   *BHBus
    reg      *RegisterInfo
    procInfo *ProcInfo
    handlers map[string]MicroFunc
    serverId string
    fnLog    func(...interface{})
    SubCh         chan *bhome_msg.MsgPublish
    SubCh chan *bhome_msg.MsgPublish
    mtx         sync.Mutex
    started     bool
    mtx     sync.Mutex
    started bool
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
    conf := NewConfig(KEY_REGISTER,512,5,60000,60000,2000, fnLog)
func NewMicroNode(ctx context.Context, q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error) {
    conf := NewConfig(KEY_REGISTER, 512, 5, 60000, 60000, 2000, fnLog)
    handle, err := Register(ctx, q, conf, reg)
    if err != nil {
        return nil, err
    }
    mn := &MicroNode {
        ctx: ctx,
    mn := &MicroNode{
        ctx:      ctx,
        serverId: serverId,
        handle:   handle,
        reg:      reg,
@@ -76,8 +76,6 @@
            return
        case <-t.C:
            h.HeartBeat()
        default:
            time.Sleep(500 * time.Millisecond)
        }
    }
}
@@ -96,21 +94,19 @@
        for {
            select {
            case <- ms.ctx.Done():
            case <-ms.ctx.Done():
                return
            case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息
                go ms.serve(ms.handle.ctx, &msgR)
            case msgS := <-ms.handle.ChSub:
                ms.SubCh <- &msgS
            default:
                time.Sleep(50 * time.Millisecond)
            }
        }
    }
    ms.mtx.Unlock()
}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply, error) {
    t := time.Now()
    ms.printLog("1:", time.Since(t))
@@ -118,17 +114,17 @@
    rb, _ := json.Marshal(request)
    msgR := &bhome_msg.MsgRequestTopic{
        Topic: []byte(request.Path),
        Data: rb,
        Data:  rb,
    }
    ms.printLog("2:", time.Since(t))
    return ms.handle.Request(serverId, msgR, milliSecs)
}
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply, error) {
    rb, _ := json.Marshal(request)
    msgR := &bhome_msg.MsgRequestTopic{
        Topic: []byte(request.Path),
        Data: rb,
        Data:  rb,
    }
    return ms.handle.Request(serverId, msgR, milliSecs)
@@ -142,7 +138,7 @@
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []*bhome_msg.MsgQueryTopicReply_BHNodeAddress {
    netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
    if err != nil {
        ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
        ms.printLog("topic:", topicName, " netNodes:", netNodes, "err:", err)
        return nil
    }
    return netNodes
@@ -157,7 +153,7 @@
//    return netNodes
//}
func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info,error) {
func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info, error) {
    return ms.handle.RequestCenter()
}
@@ -171,13 +167,13 @@
    err := json.Unmarshal(msgR.Data, &reqBody)
    if err != nil {
        ms.printLog("serve unmarshal msgR.Body err:", err)
        ri = &Reply {
        ri = &Reply{
            Msg: err.Error(),
        }
    } else {
        ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
        ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:", reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
        if f,ok := ms.handlers[reqBody.Path];ok {
        if f, ok := ms.handlers[reqBody.Path]; ok {
            reqBody.SrcProc = ProcInfo{
                ID: msgR.ProcId,
            }
@@ -196,8 +192,8 @@
            ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
            ri = &Reply{
                Success: false,
                Msg: "请求的接口不存在,请检查url",
                Data: "请求的接口不存在,请检查url",
                Msg:     "请求的接口不存在,请检查url",
                Data:    "请求的接口不存在,请检查url",
            }
        }
    }
@@ -211,15 +207,15 @@
}
//发布到本机
func (ms *MicroNode) Publish(topic string,msg []byte) error {
func (ms *MicroNode) Publish(topic string, msg []byte) error {
    var nodes []bhome_msg.BHAddress
    return ms.PublishNet(nodes, topic, msg)
}
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string, data []byte) error {
    pi := &bhome_msg.MsgPublish{
        Topic: []byte(topic),
        Data: data,
        Data:  data,
    }
    return ms.handle.Pub(nodes, pi)
}
@@ -227,7 +223,7 @@
func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
    pi := &bhome_msg.MsgPublish{
        Topic: []byte(topic),
        Data: data,
        Data:  data,
    }
    return ms.handle.PubTimeout(nodes, pi, timeout)
}
@@ -235,12 +231,12 @@
//订阅主题
func (ms *MicroNode) Subscribe(topics []string) {
    ms.handle.Sub(topics)
    for _,t := range topics {
    for _, t := range topics {
        if ms.reg.SubTopic == nil {
            ms.reg.SubTopic = make([]string, 0)
        }
        found := false
        for _,it := range ms.reg.SubTopic {
        for _, it := range ms.reg.SubTopic {
            if it == t {
                found = true
                break
@@ -258,9 +254,9 @@
    ms.handle.DeSub(topics)
    if ms.reg.SubTopic != nil {
        var leftTopics []string
        for _,t := range ms.reg.SubTopic {
        for _, t := range ms.reg.SubTopic {
            found := false
            for _,it := range topics {
            for _, it := range topics {
                if it == t {
                    found = true
                    break