From a35153875f213929601a39c47f0823b310210321 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期五, 02 四月 2021 10:37:11 +0800
Subject: [PATCH] MicroFunc添加Context参数

---
 micronode.go |  149 ++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 126 insertions(+), 23 deletions(-)

diff --git a/micronode.go b/micronode.go
index 3d83727..b7acf0d 100644
--- a/micronode.go
+++ b/micronode.go
@@ -7,6 +7,7 @@
 	"errors"
 	"fmt"
 	"os"
+	"sync"
 	"time"
 )
 
@@ -20,10 +21,13 @@
 	fnLog 		func(...interface{})
 
 	SubCh 		chan *MsgInfo
+
+	mtx 		sync.Mutex
+	started 	bool
 }
 
 func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
-	conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog)
+	conf := NewConfig(KEY_REGISTER,512,5,5000,5000,2000, fnLog)
 	handle, err := Register(ctx, q, conf, reg)
 	if err != nil {
 		return nil, err
@@ -83,34 +87,75 @@
 }
 
 func (ms *MicroNode) StartClient() {
-	go ms.startHeartbeat()
+	ms.mtx.Lock()
+	defer ms.mtx.Unlock()
+	if !ms.started {
+		ms.started = true
+
+		go ms.startHeartbeat()
+	}
 }
 
 func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
-	ms.handlers = funcMap
+	ms.mtx.Lock()
+	if !ms.started {
+		ms.started = true
+		ms.mtx.Unlock()
 
-	go ms.startHeartbeat()
+		ms.handlers = funcMap
 
-	for {
-		select {
-		case <- ms.ctx.Done():
-			return
-		default:
-			msgS, msgR, keyR := ms.handle.GetMsg()
-			if msgS != nil {
-				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
-				ms.printLog("Recv Sub Message:", string(msgS.Body))
-				ms.SubCh <- msgS
+		go ms.startHeartbeat()
+
+		for {
+			select {
+			case <- ms.ctx.Done():
+				return
+			default:
+				msgS, msgR, keyR := ms.handle.GetMsg()
+				if msgS != nil {
+					//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+					ms.printLog("Recv Sub Message:", string(msgS.Body))
+					ms.SubCh <- msgS
+				}
+				if msgR != nil {
+					//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+					go ms.serve(msgR, keyR)
+				}
+
+				time.Sleep(50 * time.Millisecond)
 			}
-			if msgR != nil {
-				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-				go ms.serve(msgR, keyR)
-			}
-
-			time.Sleep(50 * time.Millisecond)
 		}
+
+		//鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+		//go ms.startRecvSubMsg()
+		//浣滀负server鍚姩
+		//ms.serve()
 	}
+	ms.mtx.Unlock()
 }
+
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+//func (ms *MicroNode) startRecvSubMsg() {
+//	for {
+//		select {
+//		case <- ms.ctx.Done():
+//			return
+//		default:
+//			msgS, msgR, keyR := ms.handle.GetMsg()
+//			if msgS != nil {
+//				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+//				ms.printLog("Recv Sub Message:", string(msgS.Body))
+//				ms.SubCh <- msgS
+//			}
+//			if msgR != nil {
+//				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+//				go ms.serve(msgR, keyR)
+//			}
+//
+//			time.Sleep(50 * time.Millisecond)
+//		}
+//	}
+//}
 
 func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
 	t := time.Now()
@@ -189,6 +234,52 @@
 	return nil, fmt.Errorf("GetRegisteredClient list failed")
 }
 
+//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+//	ri := &Reply{}
+//	if ms.handlers == nil {
+//		ri.Msg = "send wrong addr, check yourself!!!"
+//	} else {
+//		var msgR MsgInfo
+//		err := json.Unmarshal(rdata, &msgR)
+//		if err != nil {
+//			ri.Msg = err.Error()
+//		} else {
+//			var reqBody Request
+//			err = json.Unmarshal(rdata, &msgR.Body)
+//			if err != nil {
+//				ri.Msg = err.Error()
+//			} else {
+//				ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+//				if f,ok := ms.handlers[reqBody.Path];ok {
+//					reqBody.SrcProc = msgR.SrcProc
+//					ri = f(&reqBody)
+//					ms.printLog("call funcMap f,reply:", *ri)
+//				} else {
+//					ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+//					ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+//				}
+//			}
+//		}
+//	}
+//	result, err := json.Marshal(*ri)
+//	if err != nil {
+//		sdata = nil
+//	} else {
+//		sdata = &result
+//	}
+//	return ri.Success
+//}
+
+//func (ms *MicroNode) serve() {
+//	if ms.handlers == nil {
+//		return
+//	}
+//	for i:=0;i<10;i++ {
+//		ms.handle.wg.Add(1)
+//		go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+//	}
+//}
+
 func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
 	if ms.handlers == nil {
 		return
@@ -203,12 +294,16 @@
 			Msg: err.Error(),
 		}
 	} else {
-		ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+		ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
 
 		if f,ok := ms.handlers[reqBody.Path];ok {
 			reqBody.SrcProc = msgR.SrcProc
-			ri = f(&reqBody)
-			ms.printLog("call funcMap f,reply:", *ri)
+			ctx := Context{
+				ms,
+				ms,
+			}
+			ri = f(&ctx, &reqBody)
+			ms.printLog("call funcMap f,reply.Success:", ri.Success)
 		} else {
 			ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
 			ri = &Reply{
@@ -241,6 +336,14 @@
 	return ms.handle.Pub(nodes, pi)
 }
 
+func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
+	pi := &MsgInfo{
+		Topic: topic,
+		Body: msg,
+	}
+	return ms.handle.PubTimeout(nodes, pi, timeout)
+}
+
 //璁㈤槄涓婚
 func (ms *MicroNode) Subscribe(topics []string) {
 	ms.handle.Sub(topics)

--
Gitblit v1.8.0