From a35153875f213929601a39c47f0823b310210321 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期五, 02 四月 2021 10:37:11 +0800
Subject: [PATCH] MicroFunc添加Context参数
---
micronode.go | 149 ++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 126 insertions(+), 23 deletions(-)
diff --git a/micronode.go b/micronode.go
index 3d83727..b7acf0d 100644
--- a/micronode.go
+++ b/micronode.go
@@ -7,6 +7,7 @@
"errors"
"fmt"
"os"
+ "sync"
"time"
)
@@ -20,10 +21,13 @@
fnLog func(...interface{})
SubCh chan *MsgInfo
+
+ mtx sync.Mutex
+ started bool
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
- conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog)
+ conf := NewConfig(KEY_REGISTER,512,5,5000,5000,2000, fnLog)
handle, err := Register(ctx, q, conf, reg)
if err != nil {
return nil, err
@@ -83,34 +87,75 @@
}
func (ms *MicroNode) StartClient() {
- go ms.startHeartbeat()
+ ms.mtx.Lock()
+ defer ms.mtx.Unlock()
+ if !ms.started {
+ ms.started = true
+
+ go ms.startHeartbeat()
+ }
}
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
- ms.handlers = funcMap
+ ms.mtx.Lock()
+ if !ms.started {
+ ms.started = true
+ ms.mtx.Unlock()
- go ms.startHeartbeat()
+ ms.handlers = funcMap
- for {
- select {
- case <- ms.ctx.Done():
- return
- default:
- msgS, msgR, keyR := ms.handle.GetMsg()
- if msgS != nil {
- //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
- ms.printLog("Recv Sub Message:", string(msgS.Body))
- ms.SubCh <- msgS
+ go ms.startHeartbeat()
+
+ for {
+ select {
+ case <- ms.ctx.Done():
+ return
+ default:
+ msgS, msgR, keyR := ms.handle.GetMsg()
+ if msgS != nil {
+ //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+ ms.printLog("Recv Sub Message:", string(msgS.Body))
+ ms.SubCh <- msgS
+ }
+ if msgR != nil {
+ //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+ go ms.serve(msgR, keyR)
+ }
+
+ time.Sleep(50 * time.Millisecond)
}
- if msgR != nil {
- //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
- go ms.serve(msgR, keyR)
- }
-
- time.Sleep(50 * time.Millisecond)
}
+
+ //鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+ //go ms.startRecvSubMsg()
+ //浣滀负server鍚姩
+ //ms.serve()
}
+ ms.mtx.Unlock()
}
+
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+//func (ms *MicroNode) startRecvSubMsg() {
+// for {
+// select {
+// case <- ms.ctx.Done():
+// return
+// default:
+// msgS, msgR, keyR := ms.handle.GetMsg()
+// if msgS != nil {
+// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+// ms.printLog("Recv Sub Message:", string(msgS.Body))
+// ms.SubCh <- msgS
+// }
+// if msgR != nil {
+// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+// go ms.serve(msgR, keyR)
+// }
+//
+// time.Sleep(50 * time.Millisecond)
+// }
+// }
+//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
t := time.Now()
@@ -189,6 +234,52 @@
return nil, fmt.Errorf("GetRegisteredClient list failed")
}
+//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+// ri := &Reply{}
+// if ms.handlers == nil {
+// ri.Msg = "send wrong addr, check yourself!!!"
+// } else {
+// var msgR MsgInfo
+// err := json.Unmarshal(rdata, &msgR)
+// if err != nil {
+// ri.Msg = err.Error()
+// } else {
+// var reqBody Request
+// err = json.Unmarshal(rdata, &msgR.Body)
+// if err != nil {
+// ri.Msg = err.Error()
+// } else {
+// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+// if f,ok := ms.handlers[reqBody.Path];ok {
+// reqBody.SrcProc = msgR.SrcProc
+// ri = f(&reqBody)
+// ms.printLog("call funcMap f,reply:", *ri)
+// } else {
+// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+// }
+// }
+// }
+// }
+// result, err := json.Marshal(*ri)
+// if err != nil {
+// sdata = nil
+// } else {
+// sdata = &result
+// }
+// return ri.Success
+//}
+
+//func (ms *MicroNode) serve() {
+// if ms.handlers == nil {
+// return
+// }
+// for i:=0;i<10;i++ {
+// ms.handle.wg.Add(1)
+// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+// }
+//}
+
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
if ms.handlers == nil {
return
@@ -203,12 +294,16 @@
Msg: err.Error(),
}
} else {
- ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+ ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
if f,ok := ms.handlers[reqBody.Path];ok {
reqBody.SrcProc = msgR.SrcProc
- ri = f(&reqBody)
- ms.printLog("call funcMap f,reply:", *ri)
+ ctx := Context{
+ ms,
+ ms,
+ }
+ ri = f(&ctx, &reqBody)
+ ms.printLog("call funcMap f,reply.Success:", ri.Success)
} else {
ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
ri = &Reply{
@@ -241,6 +336,14 @@
return ms.handle.Pub(nodes, pi)
}
+func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
+ pi := &MsgInfo{
+ Topic: topic,
+ Body: msg,
+ }
+ return ms.handle.PubTimeout(nodes, pi, timeout)
+}
+
//璁㈤槄涓婚
func (ms *MicroNode) Subscribe(topics []string) {
ms.handle.Sub(topics)
--
Gitblit v1.8.0