From 81afb6ffbf7f76f49644a1832dcfe241552d7e08 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期五, 05 二月 2021 18:20:09 +0800 Subject: [PATCH] add recvandsend --- micronode.go | 75 ++++++++++++++---------- hbusc.go | 65 ++++++++++++--------- 2 files changed, 82 insertions(+), 58 deletions(-) diff --git a/hbusc.go b/hbusc.go index d0f9222..640b55c 100644 --- a/hbusc.go +++ b/hbusc.go @@ -7,7 +7,6 @@ "errors" "fmt" "os" - "strconv" "sync" "time" ) @@ -51,7 +50,6 @@ //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨� chSub chan TransInfo - chReply chan TransInfo } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� @@ -86,13 +84,31 @@ } } +func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { + for { + select { + case <-ctx.Done(): + logFn("recvandsendRoutine ctx.Done") + wg.Done() + return + default: + n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� + if n != 0 { + logFn("RecvandsendTimeout success") + } else { + //time.Sleep(100 * time.Millisecond) + } + } + } +} + //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { - handle := &BHBus{ + handle := &BHBus { + ctx: ctx, conf: config, m: make(map[string]*sockServer), chSub: make(chan TransInfo, config.chSize), - chReply: make(chan TransInfo, config.chSize), } var err error @@ -211,9 +227,9 @@ sockReply := bhomebus.OpenSocket() sockReply.ForceBind(int(regR.ReplyKey)) handle.printLog("after pubTopic forceBind") - handle.wg.Add(1) + //handle.wg.Add(1) //serve server reply - go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) + //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) handle.sockRep = &sockServer{ sock: sockReply, info: &ri.Proc, @@ -468,19 +484,19 @@ return nil, fmt.Errorf("request err") } -func (h *BHBus) Reply(replyKey int, i *Reply) error { - data,err := json.Marshal(*i) - if err != nil { - return err - } - - n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) - h.printLog("reply to key:", replyKey, " n:",n) - if n != 0 { - return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) - } - return nil -} +//func (h *BHBus) Reply(replyKey int, i *Reply) error { +// data,err := json.Marshal(*i) +// if err != nil { +// return err +// } +// +// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) +// h.printLog("reply to key:", replyKey, " n:",n) +// if n != 0 { +// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) +// } +// return nil +//} //鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�. @@ -568,18 +584,13 @@ //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� -func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { - if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { - return nil,nil, -1 +func (h *BHBus) GetMsg() (subMsg *MsgInfo) { + if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { + return nil } if len(h.chSub) >0 { m := <-h.chSub subMsg = m.info - } - if len(h.chReply) >0 { - m := <-h.chReply - replyMsg = m.info - replyKey = m.port } return } \ No newline at end of file diff --git a/micronode.go b/micronode.go index df721e8..9fa49a2 100644 --- a/micronode.go +++ b/micronode.go @@ -90,21 +90,24 @@ ms.handlers = funcMap go ms.startHeartbeat() + //鎺ユ敹璁㈤槄鍒扮殑娑堟伅 + go ms.startRecvSubMsg() + //浣滀负server鍚姩 + ms.serve() +} +//寮�濮嬫帴鏀惰闃呮秷鎭� +func (ms *MicroNode) startRecvSubMsg() { for { select { case <- ms.ctx.Done(): return default: - msgS, msgR, keyR := ms.handle.GetMsg() + msgS := ms.handle.GetMsg() if msgS != nil { //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� ms.printLog("Recv Sub Message:", string(msgS.Body)) ms.SubCh <- msgS - } - if msgR != nil { - //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� - go ms.serve(msgR, keyR) } time.Sleep(50 * time.Millisecond) @@ -189,39 +192,49 @@ return nil, fmt.Errorf("GetRegisteredClient list failed") } -func (ms *MicroNode) serve(msgR *MsgInfo, p int) { +func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { + ri := &Reply{} if ms.handlers == nil { - return - } - - var reqBody Request - var ri *Reply - err := json.Unmarshal(msgR.Body, &reqBody) - if err != nil { - ms.printLog("serve unmarshal msgR.Body err:", err) - ri = &Reply { - Msg: err.Error(), - } + ri.Msg = "send wrong addr, check yourself!!!" } else { - ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) - - if f,ok := ms.handlers[reqBody.Path];ok { - reqBody.SrcProc = msgR.SrcProc - ri = f(&reqBody) - ms.printLog("call funcMap f,reply:", *ri) + var msgR MsgInfo + err := json.Unmarshal(rdata, &msgR) + if err != nil { + ri.Msg = err.Error() } else { - ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) - ri = &Reply{ - Success: false, - Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", - Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", + var reqBody Request + err = json.Unmarshal(rdata, &msgR.Body) + if err != nil { + ri.Msg = err.Error() + } else { + ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) + if f,ok := ms.handlers[reqBody.Path];ok { + reqBody.SrcProc = msgR.SrcProc + ri = f(&reqBody) + ms.printLog("call funcMap f,reply:", *ri) + } else { + ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) + ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl" + } } } } + result, err := json.Marshal(*ri) + if err != nil { + sdata = nil + } else { + sdata = &result + } + return ri.Success +} - retErr := ms.handle.Reply(p, ri) - if retErr != nil { - ms.printLog("retErr:", retErr) +func (ms *MicroNode) serve() { + if ms.handlers == nil { + return + } + for i:=0;i<10;i++ { + ms.handle.wg.Add(1) + go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) } } -- Gitblit v1.8.0