From 6f7957c42c409624ca7a05c54bd35752f996ba68 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期日, 07 二月 2021 15:24:10 +0800 Subject: [PATCH] rm recvandsend --- micronode.go | 158 ++++++++++++++++++++++--------- hbusc.go | 90 +++++++++++------- 2 files changed, 166 insertions(+), 82 deletions(-) diff --git a/hbusc.go b/hbusc.go index 640b55c..a4a6649 100644 --- a/hbusc.go +++ b/hbusc.go @@ -7,6 +7,7 @@ "errors" "fmt" "os" + "strconv" "sync" "time" ) @@ -50,6 +51,7 @@ //mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨� chSub chan TransInfo + chReply chan TransInfo } //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲�� @@ -84,23 +86,23 @@ } } -func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { - for { - select { - case <-ctx.Done(): - logFn("recvandsendRoutine ctx.Done") - wg.Done() - return - default: - n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� - if n != 0 { - logFn("RecvandsendTimeout success") - } else { - //time.Sleep(100 * time.Millisecond) - } - } - } -} +//func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { +// for { +// select { +// case <-ctx.Done(): +// logFn("recvandsendRoutine ctx.Done") +// wg.Done() +// return +// default: +// n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃� +// if n != 0 { +// logFn("RecvandsendTimeout success") +// } else { +// //time.Sleep(100 * time.Millisecond) +// } +// } +// } +//} //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { @@ -109,6 +111,7 @@ conf: config, m: make(map[string]*sockServer), chSub: make(chan TransInfo, config.chSize), + chReply: make(chan TransInfo, config.chSize), } var err error @@ -227,9 +230,9 @@ sockReply := bhomebus.OpenSocket() sockReply.ForceBind(int(regR.ReplyKey)) handle.printLog("after pubTopic forceBind") - //handle.wg.Add(1) + handle.wg.Add(1) //serve server reply - //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) + go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) handle.sockRep = &sockServer{ sock: sockReply, info: &ri.Proc, @@ -484,19 +487,19 @@ return nil, fmt.Errorf("request err") } -//func (h *BHBus) Reply(replyKey int, i *Reply) error { -// data,err := json.Marshal(*i) -// if err != nil { -// return err -// } -// -// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) -// h.printLog("reply to key:", replyKey, " n:",n) -// if n != 0 { -// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) -// } -// return nil -//} +func (h *BHBus) Reply(replyKey int, i *Reply) error { + data,err := json.Marshal(*i) + if err != nil { + return err + } + + n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) + h.printLog("reply to key:", replyKey, " n:",n) + if n != 0 { + return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) + } + return nil +} //鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�. @@ -584,13 +587,30 @@ //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭� -func (h *BHBus) GetMsg() (subMsg *MsgInfo) { - if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { - return nil +//func (h *BHBus) GetMsg() (subMsg *MsgInfo) { +// if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { +// return nil +// } +// if len(h.chSub) >0 { +// m := <-h.chSub +// subMsg = m.info +// } +// return +//} + + +func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { + if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { + return nil, nil, -1 } if len(h.chSub) >0 { m := <-h.chSub subMsg = m.info } + if len(h.chReply) >0 { + m := <-h.chReply + replyMsg = m.info + replyKey = m.port + } return } \ No newline at end of file diff --git a/micronode.go b/micronode.go index 94161b3..1a409c6 100644 --- a/micronode.go +++ b/micronode.go @@ -90,30 +90,58 @@ ms.handlers = funcMap go ms.startHeartbeat() - //鎺ユ敹璁㈤槄鍒扮殑娑堟伅 - go ms.startRecvSubMsg() - //浣滀负server鍚姩 - ms.serve() -} -//寮�濮嬫帴鏀惰闃呮秷鎭� -func (ms *MicroNode) startRecvSubMsg() { for { select { case <- ms.ctx.Done(): return default: - msgS := ms.handle.GetMsg() + msgS, msgR, keyR := ms.handle.GetMsg() if msgS != nil { //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� ms.printLog("Recv Sub Message:", string(msgS.Body)) ms.SubCh <- msgS } + if msgR != nil { + //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� + go ms.serve(msgR, keyR) + } time.Sleep(50 * time.Millisecond) } } + + + + + //鎺ユ敹璁㈤槄鍒扮殑娑堟伅 + //go ms.startRecvSubMsg() + //浣滀负server鍚姩 + //ms.serve() } + +//寮�濮嬫帴鏀惰闃呮秷鎭� +//func (ms *MicroNode) startRecvSubMsg() { +// for { +// select { +// case <- ms.ctx.Done(): +// return +// default: +// msgS, msgR, keyR := ms.handle.GetMsg() +// if msgS != nil { +// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� +// ms.printLog("Recv Sub Message:", string(msgS.Body)) +// ms.SubCh <- msgS +// } +// if msgR != nil { +// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� +// go ms.serve(msgR, keyR) +// } +// +// time.Sleep(50 * time.Millisecond) +// } +// } +//} func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { t := time.Now() @@ -192,49 +220,85 @@ return nil, fmt.Errorf("GetRegisteredClient list failed") } -func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { - ri := &Reply{} - if ms.handlers == nil { - ri.Msg = "send wrong addr, check yourself!!!" - } else { - var msgR MsgInfo - err := json.Unmarshal(rdata, &msgR) - if err != nil { - ri.Msg = err.Error() - } else { - var reqBody Request - err = json.Unmarshal(rdata, &msgR.Body) - if err != nil { - ri.Msg = err.Error() - } else { - ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) - if f,ok := ms.handlers[reqBody.Path];ok { - reqBody.SrcProc = msgR.SrcProc - ri = f(&reqBody) - ms.printLog("call funcMap f,reply:", *ri) - } else { - ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) - ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl" - } - } - } - } - result, err := json.Marshal(*ri) - if err != nil { - sdata = nil - } else { - sdata = &result - } - return ri.Success -} +//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { +// ri := &Reply{} +// if ms.handlers == nil { +// ri.Msg = "send wrong addr, check yourself!!!" +// } else { +// var msgR MsgInfo +// err := json.Unmarshal(rdata, &msgR) +// if err != nil { +// ri.Msg = err.Error() +// } else { +// var reqBody Request +// err = json.Unmarshal(rdata, &msgR.Body) +// if err != nil { +// ri.Msg = err.Error() +// } else { +// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) +// if f,ok := ms.handlers[reqBody.Path];ok { +// reqBody.SrcProc = msgR.SrcProc +// ri = f(&reqBody) +// ms.printLog("call funcMap f,reply:", *ri) +// } else { +// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) +// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl" +// } +// } +// } +// } +// result, err := json.Marshal(*ri) +// if err != nil { +// sdata = nil +// } else { +// sdata = &result +// } +// return ri.Success +//} -func (ms *MicroNode) serve() { +//func (ms *MicroNode) serve() { +// if ms.handlers == nil { +// return +// } +// for i:=0;i<10;i++ { +// ms.handle.wg.Add(1) +// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) +// } +//} + +func (ms *MicroNode) serve(msgR *MsgInfo, p int) { if ms.handlers == nil { return } - for i:=0;i<10;i++ { - ms.handle.wg.Add(1) - go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) + + var reqBody Request + var ri *Reply + err := json.Unmarshal(msgR.Body, &reqBody) + if err != nil { + ms.printLog("serve unmarshal msgR.Body err:", err) + ri = &Reply { + Msg: err.Error(), + } + } else { + ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) + + if f,ok := ms.handlers[reqBody.Path];ok { + reqBody.SrcProc = msgR.SrcProc + ri = f(&reqBody) + ms.printLog("call funcMap f,reply:", *ri) + } else { + ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) + ri = &Reply{ + Success: false, + Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", + Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", + } + } + } + + retErr := ms.handle.Reply(p, ri) + if retErr != nil { + ms.printLog("retErr:", retErr) } } -- Gitblit v1.8.0