From 6f7957c42c409624ca7a05c54bd35752f996ba68 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 07 二月 2021 15:24:10 +0800
Subject: [PATCH] rm recvandsend

---
 micronode.go |  158 ++++++++++++++++++++++---------
 hbusc.go     |   90 +++++++++++-------
 2 files changed, 166 insertions(+), 82 deletions(-)

diff --git a/hbusc.go b/hbusc.go
index 640b55c..a4a6649 100644
--- a/hbusc.go
+++ b/hbusc.go
@@ -7,6 +7,7 @@
 	"errors"
 	"fmt"
 	"os"
+	"strconv"
 	"sync"
 	"time"
 )
@@ -50,6 +51,7 @@
 	//mtxWorker 	sync.Mutex	 //SendAndRecv鍙兘涓嶆槸绾跨▼瀹夊叏鐨�
 
 	chSub chan TransInfo
+	chReply chan TransInfo
 }
 
 //鑾峰彇鍏朵粬杩涚▼鍙戠粰姝ocket鐨勬秷鎭�,鍙互鏄痵ub鐨勬帴鏀讹紝涔熷彲浠ユ槸reply鐨勬帴鏀躲��
@@ -84,23 +86,23 @@
 	}
 }
 
-func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
-	for {
-		select {
-		case <-ctx.Done():
-			logFn("recvandsendRoutine ctx.Done")
-			wg.Done()
-			return
-		default:
-			n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
-			if n != 0 {
-				logFn("RecvandsendTimeout success")
-			} else {
-				//time.Sleep(100 * time.Millisecond)
-			}
-		}
-	}
-}
+//func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
+//	for {
+//		select {
+//		case <-ctx.Done():
+//			logFn("recvandsendRoutine ctx.Done")
+//			wg.Done()
+//			return
+//		default:
+//			n := s.RecvandsendTimeout(1000, serveFn) //鐩墠10001杩斿洖鍊艰〃绀鸿秴鏃�
+//			if n != 0 {
+//				logFn("RecvandsendTimeout success")
+//			} else {
+//				//time.Sleep(100 * time.Millisecond)
+//			}
+//		}
+//	}
+//}
 
 //Register
 func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
@@ -109,6 +111,7 @@
 		conf: config,
 		m: make(map[string]*sockServer),
 		chSub: make(chan TransInfo, config.chSize),
+		chReply: make(chan TransInfo, config.chSize),
 	}
 
 	var err error
@@ -227,9 +230,9 @@
 		sockReply := bhomebus.OpenSocket()
 		sockReply.ForceBind(int(regR.ReplyKey))
 		handle.printLog("after pubTopic forceBind")
-		//handle.wg.Add(1)
+		handle.wg.Add(1)
 		//serve server reply
-		//go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
+		go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
 		handle.sockRep = &sockServer{
 			sock: sockReply,
 			info: &ri.Proc,
@@ -484,19 +487,19 @@
 	return nil, fmt.Errorf("request err")
 }
 
-//func (h *BHBus) Reply(replyKey int, i *Reply) error {
-//	data,err := json.Marshal(*i)
-//	if err != nil {
-//		return err
-//	}
-//
-//	n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
-//	h.printLog("reply to key:", replyKey, " n:",n)
-//	if n != 0 {
-//		return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
-//	}
-//	return nil
-//}
+func (h *BHBus) Reply(replyKey int, i *Reply) error {
+	data,err := json.Marshal(*i)
+	if err != nil {
+		return err
+	}
+
+	n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
+	h.printLog("reply to key:", replyKey, " n:",n)
+	if n != 0 {
+		return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
+	}
+	return nil
+}
 
 
 //鍙彂閫佽姹傦紝涓嶉渶瑕佸簲绛�.
@@ -584,13 +587,30 @@
 
 
 //鑾峰彇sub 鎴栬�呴渶瑕乺eply鐨勬秷鎭�
-func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
-	if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
-		return nil
+//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
+//	if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+//		return nil
+//	}
+//	if len(h.chSub) >0 {
+//		m  := <-h.chSub
+//		subMsg = m.info
+//	}
+//	return
+//}
+
+
+func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
+	if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
+		return nil, nil, -1
 	}
 	if len(h.chSub) >0 {
 		m  := <-h.chSub
 		subMsg = m.info
 	}
+	if len(h.chReply) >0 {
+		m := <-h.chReply
+		replyMsg = m.info
+		replyKey = m.port
+	}
 	return
 }
\ No newline at end of file
diff --git a/micronode.go b/micronode.go
index 94161b3..1a409c6 100644
--- a/micronode.go
+++ b/micronode.go
@@ -90,30 +90,58 @@
 	ms.handlers = funcMap
 
 	go ms.startHeartbeat()
-	//鎺ユ敹璁㈤槄鍒扮殑娑堟伅
-	go ms.startRecvSubMsg()
-	//浣滀负server鍚姩
-	ms.serve()
-}
 
-//寮�濮嬫帴鏀惰闃呮秷鎭�
-func (ms *MicroNode) startRecvSubMsg() {
 	for {
 		select {
 		case <- ms.ctx.Done():
 			return
 		default:
-			msgS := ms.handle.GetMsg()
+			msgS, msgR, keyR := ms.handle.GetMsg()
 			if msgS != nil {
 				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
 				ms.printLog("Recv Sub Message:", string(msgS.Body))
 				ms.SubCh <- msgS
 			}
+			if msgR != nil {
+				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+				go ms.serve(msgR, keyR)
+			}
 
 			time.Sleep(50 * time.Millisecond)
 		}
 	}
+
+
+
+
+	//鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+	//go ms.startRecvSubMsg()
+	//浣滀负server鍚姩
+	//ms.serve()
 }
+
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+//func (ms *MicroNode) startRecvSubMsg() {
+//	for {
+//		select {
+//		case <- ms.ctx.Done():
+//			return
+//		default:
+//			msgS, msgR, keyR := ms.handle.GetMsg()
+//			if msgS != nil {
+//				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+//				ms.printLog("Recv Sub Message:", string(msgS.Body))
+//				ms.SubCh <- msgS
+//			}
+//			if msgR != nil {
+//				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+//				go ms.serve(msgR, keyR)
+//			}
+//
+//			time.Sleep(50 * time.Millisecond)
+//		}
+//	}
+//}
 
 func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
 	t := time.Now()
@@ -192,49 +220,85 @@
 	return nil, fmt.Errorf("GetRegisteredClient list failed")
 }
 
-func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
-	ri := &Reply{}
-	if ms.handlers == nil {
-		ri.Msg = "send wrong addr, check yourself!!!"
-	} else {
-		var msgR MsgInfo
-		err := json.Unmarshal(rdata, &msgR)
-		if err != nil {
-			ri.Msg = err.Error()
-		} else {
-			var reqBody Request
-			err = json.Unmarshal(rdata, &msgR.Body)
-			if err != nil {
-				ri.Msg = err.Error()
-			} else {
-				ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
-				if f,ok := ms.handlers[reqBody.Path];ok {
-					reqBody.SrcProc = msgR.SrcProc
-					ri = f(&reqBody)
-					ms.printLog("call funcMap f,reply:", *ri)
-				} else {
-					ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
-					ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
-				}
-			}
-		}
-	}
-	result, err := json.Marshal(*ri)
-	if err != nil {
-		sdata = nil
-	} else {
-		sdata = &result
-	}
-	return ri.Success
-}
+//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+//	ri := &Reply{}
+//	if ms.handlers == nil {
+//		ri.Msg = "send wrong addr, check yourself!!!"
+//	} else {
+//		var msgR MsgInfo
+//		err := json.Unmarshal(rdata, &msgR)
+//		if err != nil {
+//			ri.Msg = err.Error()
+//		} else {
+//			var reqBody Request
+//			err = json.Unmarshal(rdata, &msgR.Body)
+//			if err != nil {
+//				ri.Msg = err.Error()
+//			} else {
+//				ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+//				if f,ok := ms.handlers[reqBody.Path];ok {
+//					reqBody.SrcProc = msgR.SrcProc
+//					ri = f(&reqBody)
+//					ms.printLog("call funcMap f,reply:", *ri)
+//				} else {
+//					ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+//					ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+//				}
+//			}
+//		}
+//	}
+//	result, err := json.Marshal(*ri)
+//	if err != nil {
+//		sdata = nil
+//	} else {
+//		sdata = &result
+//	}
+//	return ri.Success
+//}
 
-func (ms *MicroNode) serve() {
+//func (ms *MicroNode) serve() {
+//	if ms.handlers == nil {
+//		return
+//	}
+//	for i:=0;i<10;i++ {
+//		ms.handle.wg.Add(1)
+//		go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+//	}
+//}
+
+func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
 	if ms.handlers == nil {
 		return
 	}
-	for i:=0;i<10;i++ {
-		ms.handle.wg.Add(1)
-		go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+
+	var reqBody Request
+	var ri *Reply
+	err := json.Unmarshal(msgR.Body, &reqBody)
+	if err != nil {
+		ms.printLog("serve unmarshal msgR.Body err:", err)
+		ri = &Reply {
+			Msg: err.Error(),
+		}
+	} else {
+		ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+
+		if f,ok := ms.handlers[reqBody.Path];ok {
+			reqBody.SrcProc = msgR.SrcProc
+			ri = f(&reqBody)
+			ms.printLog("call funcMap f,reply:", *ri)
+		} else {
+			ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+			ri = &Reply{
+				Success: false,
+				Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+				Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+			}
+		}
+	}
+
+	retErr := ms.handle.Reply(p, ri)
+	if retErr != nil {
+		ms.printLog("retErr:", retErr)
 	}
 }
 

--
Gitblit v1.8.0