From 6f7957c42c409624ca7a05c54bd35752f996ba68 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 07 二月 2021 15:24:10 +0800
Subject: [PATCH] rm recvandsend
---
micronode.go | 158 ++++++++++++++++++++++---------
hbusc.go | 90 +++++++++++-------
2 files changed, 166 insertions(+), 82 deletions(-)
diff --git a/hbusc.go b/hbusc.go
index 640b55c..a4a6649 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -7,6 +7,7 @@
"errors"
"fmt"
"os"
+ "strconv"
"sync"
"time"
)
@@ -50,6 +51,7 @@
//mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
chSub chan TransInfo
+ chReply chan TransInfo
}
//鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
@@ -84,23 +86,23 @@
}
}
-func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
- for {
- select {
- case <-ctx.Done():
- logFn("recvandsendRoutine ctx.Done")
- wg.Done()
- return
- default:
- n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
- if n != 0 {
- logFn("RecvandsendTimeout success")
- } else {
- //time.Sleep(100 * time.Millisecond)
- }
- }
- }
-}
+//func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
+// for {
+// select {
+// case <-ctx.Done():
+// logFn("recvandsendRoutine ctx.Done")
+// wg.Done()
+// return
+// default:
+// n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+// if n != 0 {
+// logFn("RecvandsendTimeout success")
+// } else {
+// //time.Sleep(100 * time.Millisecond)
+// }
+// }
+// }
+//}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
@@ -109,6 +111,7 @@
conf: config,
m: make(map[string]*sockServer),
chSub: make(chan TransInfo, config.chSize),
+ chReply: make(chan TransInfo, config.chSize),
}
var err error
@@ -227,9 +230,9 @@
sockReply := bhomebus.OpenSocket()
sockReply.ForceBind(int(regR.ReplyKey))
handle.printLog("after pubTopic forceBind")
- //handle.wg.Add(1)
+ handle.wg.Add(1)
//serve server reply
- //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
+ go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
handle.sockRep = &sockServer{
sock: sockReply,
info: &ri.Proc,
@@ -484,19 +487,19 @@
return nil, fmt.Errorf("request err")
}
-//func (h *BHBus) Reply(replyKey int, i *Reply) error {
-// data,err := json.Marshal(*i)
-// if err != nil {
-// return err
-// }
-//
-// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
-// h.printLog("reply to key:", replyKey, " n:",n)
-// if n != 0 {
-// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
-// }
-// return nil
-//}
+func (h *BHBus) Reply(replyKey int, i *Reply) error {
+ data,err := json.Marshal(*i)
+ if err != nil {
+ return err
+ }
+
+ n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
+ h.printLog("reply to key:", replyKey, " n:",n)
+ if n != 0 {
+ return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+ }
+ return nil
+}
//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
@@ -584,13 +587,30 @@
//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
- if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
- return nil
+//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
+// if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+// return nil
+// }
+// if len(h.chSub) >0 {
+// m := <-h.chSub
+// subMsg = m.info
+// }
+// return
+//}
+
+
+func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
+ if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+ return nil, nil, -1
}
if len(h.chSub) >0 {
m := <-h.chSub
subMsg = m.info
}
+ if len(h.chReply) >0 {
+ m := <-h.chReply
+ replyMsg = m.info
+ replyKey = m.port
+ }
return
}
\ No newline at end of file
diff --git a/micronode.go b/micronode.go
index 94161b3..1a409c6 100644
--- a/micronode.go
+++ b/micronode.go
@@ -90,30 +90,58 @@
ms.handlers = funcMap
go ms.startHeartbeat()
- //鎺ユ敹璁㈤槄鍒扮殑娑堟伅
- go ms.startRecvSubMsg()
- //浣滀负server鍚姩
- ms.serve()
-}
-//寮�濮嬫帴鏀惰闃呮秷鎭�
-func (ms *MicroNode) startRecvSubMsg() {
for {
select {
case <- ms.ctx.Done():
return
default:
- msgS := ms.handle.GetMsg()
+ msgS, msgR, keyR := ms.handle.GetMsg()
if msgS != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
ms.printLog("Recv Sub Message:", string(msgS.Body))
ms.SubCh <- msgS
}
+ if msgR != nil {
+ //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+ go ms.serve(msgR, keyR)
+ }
time.Sleep(50 * time.Millisecond)
}
}
+
+
+
+
+ //鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+ //go ms.startRecvSubMsg()
+ //浣滀负server鍚姩
+ //ms.serve()
}
+
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+//func (ms *MicroNode) startRecvSubMsg() {
+// for {
+// select {
+// case <- ms.ctx.Done():
+// return
+// default:
+// msgS, msgR, keyR := ms.handle.GetMsg()
+// if msgS != nil {
+// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+// ms.printLog("Recv Sub Message:", string(msgS.Body))
+// ms.SubCh <- msgS
+// }
+// if msgR != nil {
+// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+// go ms.serve(msgR, keyR)
+// }
+//
+// time.Sleep(50 * time.Millisecond)
+// }
+// }
+//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
t := time.Now()
@@ -192,49 +220,85 @@
return nil, fmt.Errorf("GetRegisteredClient list failed")
}
-func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
- ri := &Reply{}
- if ms.handlers == nil {
- ri.Msg = "send wrong addr, check yourself!!!"
- } else {
- var msgR MsgInfo
- err := json.Unmarshal(rdata, &msgR)
- if err != nil {
- ri.Msg = err.Error()
- } else {
- var reqBody Request
- err = json.Unmarshal(rdata, &msgR.Body)
- if err != nil {
- ri.Msg = err.Error()
- } else {
- ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
- if f,ok := ms.handlers[reqBody.Path];ok {
- reqBody.SrcProc = msgR.SrcProc
- ri = f(&reqBody)
- ms.printLog("call funcMap f,reply:", *ri)
- } else {
- ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
- ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
- }
- }
- }
- }
- result, err := json.Marshal(*ri)
- if err != nil {
- sdata = nil
- } else {
- sdata = &result
- }
- return ri.Success
-}
+//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+// ri := &Reply{}
+// if ms.handlers == nil {
+// ri.Msg = "send wrong addr, check yourself!!!"
+// } else {
+// var msgR MsgInfo
+// err := json.Unmarshal(rdata, &msgR)
+// if err != nil {
+// ri.Msg = err.Error()
+// } else {
+// var reqBody Request
+// err = json.Unmarshal(rdata, &msgR.Body)
+// if err != nil {
+// ri.Msg = err.Error()
+// } else {
+// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+// if f,ok := ms.handlers[reqBody.Path];ok {
+// reqBody.SrcProc = msgR.SrcProc
+// ri = f(&reqBody)
+// ms.printLog("call funcMap f,reply:", *ri)
+// } else {
+// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+// }
+// }
+// }
+// }
+// result, err := json.Marshal(*ri)
+// if err != nil {
+// sdata = nil
+// } else {
+// sdata = &result
+// }
+// return ri.Success
+//}
-func (ms *MicroNode) serve() {
+//func (ms *MicroNode) serve() {
+// if ms.handlers == nil {
+// return
+// }
+// for i:=0;i<10;i++ {
+// ms.handle.wg.Add(1)
+// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+// }
+//}
+
+func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
if ms.handlers == nil {
return
}
- for i:=0;i<10;i++ {
- ms.handle.wg.Add(1)
- go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+
+ var reqBody Request
+ var ri *Reply
+ err := json.Unmarshal(msgR.Body, &reqBody)
+ if err != nil {
+ ms.printLog("serve unmarshal msgR.Body err:", err)
+ ri = &Reply {
+ Msg: err.Error(),
+ }
+ } else {
+ ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+
+ if f,ok := ms.handlers[reqBody.Path];ok {
+ reqBody.SrcProc = msgR.SrcProc
+ ri = f(&reqBody)
+ ms.printLog("call funcMap f,reply:", *ri)
+ } else {
+ ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+ ri = &Reply{
+ Success: false,
+ Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ }
+ }
+ }
+
+ retErr := ms.handle.Reply(p, ri)
+ if retErr != nil {
+ ms.printLog("retErr:", retErr)
}
}
--
Gitblit v1.8.0