From 81afb6ffbf7f76f49644a1832dcfe241552d7e08 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期五, 05 二月 2021 18:20:09 +0800
Subject: [PATCH] add recvandsend
---
micronode.go | 75 ++++++++++++++----------
hbusc.go | 65 ++++++++++++---------
2 files changed, 82 insertions(+), 58 deletions(-)
diff --git a/hbusc.go b/hbusc.go
index d0f9222..640b55c 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -7,7 +7,6 @@
"errors"
"fmt"
"os"
- "strconv"
"sync"
"time"
)
@@ -51,7 +50,6 @@
//mtxWorker sync.Mutex //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
chSub chan TransInfo
- chReply chan TransInfo
}
//鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
@@ -86,13 +84,31 @@
}
}
+func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
+ for {
+ select {
+ case <-ctx.Done():
+ logFn("recvandsendRoutine ctx.Done")
+ wg.Done()
+ return
+ default:
+ n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+ if n != 0 {
+ logFn("RecvandsendTimeout success")
+ } else {
+ //time.Sleep(100 * time.Millisecond)
+ }
+ }
+ }
+}
+
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
- handle := &BHBus{
+ handle := &BHBus {
+ ctx: ctx,
conf: config,
m: make(map[string]*sockServer),
chSub: make(chan TransInfo, config.chSize),
- chReply: make(chan TransInfo, config.chSize),
}
var err error
@@ -211,9 +227,9 @@
sockReply := bhomebus.OpenSocket()
sockReply.ForceBind(int(regR.ReplyKey))
handle.printLog("after pubTopic forceBind")
- handle.wg.Add(1)
+ //handle.wg.Add(1)
//serve server reply
- go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
+ //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
handle.sockRep = &sockServer{
sock: sockReply,
info: &ri.Proc,
@@ -468,19 +484,19 @@
return nil, fmt.Errorf("request err")
}
-func (h *BHBus) Reply(replyKey int, i *Reply) error {
- data,err := json.Marshal(*i)
- if err != nil {
- return err
- }
-
- n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
- h.printLog("reply to key:", replyKey, " n:",n)
- if n != 0 {
- return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
- }
- return nil
-}
+//func (h *BHBus) Reply(replyKey int, i *Reply) error {
+// data,err := json.Marshal(*i)
+// if err != nil {
+// return err
+// }
+//
+// n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
+// h.printLog("reply to key:", replyKey, " n:",n)
+// if n != 0 {
+// return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+// }
+// return nil
+//}
//鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
@@ -568,18 +584,13 @@
//鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
- if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
- return nil,nil, -1
+func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
+ if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+ return nil
}
if len(h.chSub) >0 {
m := <-h.chSub
subMsg = m.info
- }
- if len(h.chReply) >0 {
- m := <-h.chReply
- replyMsg = m.info
- replyKey = m.port
}
return
}
\ No newline at end of file
diff --git a/micronode.go b/micronode.go
index df721e8..9fa49a2 100644
--- a/micronode.go
+++ b/micronode.go
@@ -90,21 +90,24 @@
ms.handlers = funcMap
go ms.startHeartbeat()
+ //鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+ go ms.startRecvSubMsg()
+ //浣滀负server鍚姩
+ ms.serve()
+}
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+func (ms *MicroNode) startRecvSubMsg() {
for {
select {
case <- ms.ctx.Done():
return
default:
- msgS, msgR, keyR := ms.handle.GetMsg()
+ msgS := ms.handle.GetMsg()
if msgS != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
ms.printLog("Recv Sub Message:", string(msgS.Body))
ms.SubCh <- msgS
- }
- if msgR != nil {
- //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
- go ms.serve(msgR, keyR)
}
time.Sleep(50 * time.Millisecond)
@@ -189,39 +192,49 @@
return nil, fmt.Errorf("GetRegisteredClient list failed")
}
-func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+ ri := &Reply{}
if ms.handlers == nil {
- return
- }
-
- var reqBody Request
- var ri *Reply
- err := json.Unmarshal(msgR.Body, &reqBody)
- if err != nil {
- ms.printLog("serve unmarshal msgR.Body err:", err)
- ri = &Reply {
- Msg: err.Error(),
- }
+ ri.Msg = "send wrong addr, check yourself!!!"
} else {
- ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
-
- if f,ok := ms.handlers[reqBody.Path];ok {
- reqBody.SrcProc = msgR.SrcProc
- ri = f(&reqBody)
- ms.printLog("call funcMap f,reply:", *ri)
+ var msgR MsgInfo
+ err := json.Unmarshal(rdata, &msgR)
+ if err != nil {
+ ri.Msg = err.Error()
} else {
- ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
- ri = &Reply{
- Success: false,
- Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
- Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ var reqBody Request
+ err = json.Unmarshal(rdata, &msgR.Body)
+ if err != nil {
+ ri.Msg = err.Error()
+ } else {
+ ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+ if f,ok := ms.handlers[reqBody.Path];ok {
+ reqBody.SrcProc = msgR.SrcProc
+ ri = f(&reqBody)
+ ms.printLog("call funcMap f,reply:", *ri)
+ } else {
+ ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+ ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+ }
}
}
}
+ result, err := json.Marshal(*ri)
+ if err != nil {
+ sdata = nil
+ } else {
+ sdata = &result
+ }
+ return ri.Success
+}
- retErr := ms.handle.Reply(p, ri)
- if retErr != nil {
- ms.printLog("retErr:", retErr)
+func (ms *MicroNode) serve() {
+ if ms.handlers == nil {
+ return
+ }
+ for i:=0;i<10;i++ {
+ ms.handle.wg.Add(1)
+ go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
}
}
--
Gitblit v1.8.0