From c4d6793fedf752d89adc18862300dc4e07d18cb3 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期一, 08 二月 2021 13:56:16 +0800
Subject: [PATCH] 超时调整为2s
---
micronode.go | 271 +++++++++++++++++++++++++++++++++++++----------------
1 files changed, 188 insertions(+), 83 deletions(-)
diff --git a/micronode.go b/micronode.go
index 46731d7..06580ed 100644
--- a/micronode.go
+++ b/micronode.go
@@ -19,25 +19,23 @@
serverId string
fnLog func(...interface{})
- SubChM map[string]chan *MsgInfo //浠ヨ闃呯殑涓婚涓簁ey
+ SubCh chan *MsgInfo
}
-func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
- conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog)
+func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
+ conf := NewConfig(KEY_REGISTER,512,5,2000,2000,2000, fnLog)
handle, err := Register(ctx, q, conf, reg)
if err != nil {
return nil, err
}
mn := &MicroNode {
+ ctx: ctx,
serverId: serverId,
handle: handle,
reg: reg,
- procInfo: procInfo,
- fnLog: fnLog,
- SubChM: make(map[string]chan *MsgInfo),
- }
- for _,subTopic := range reg.SubTopic {
- mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
+ procInfo: ®.Proc,
+ fnLog: fnLog,
+ SubCh: make(chan *MsgInfo, 512),
}
return mn, nil
@@ -71,7 +69,7 @@
Proc: *ms.procInfo,
}
- t := time.NewTicker(time.Second)
+ t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
@@ -102,25 +100,52 @@
if msgS != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
ms.printLog("Recv Sub Message:", string(msgS.Body))
- if ch,ok := ms.SubChM[msgS.Topic];ok {
- ch <- msgS
- }
+ ms.SubCh <- msgS
}
if msgR != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
go ms.serve(msgR, keyR)
}
+
+ time.Sleep(50 * time.Millisecond)
}
}
+
+
+
+
+ //鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+ //go ms.startRecvSubMsg()
+ //浣滀负server鍚姩
+ //ms.serve()
}
-func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
- t := time.Now()
- topicName := request.Header("Servicename")
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+//func (ms *MicroNode) startRecvSubMsg() {
+// for {
+// select {
+// case <- ms.ctx.Done():
+// return
+// default:
+// msgS, msgR, keyR := ms.handle.GetMsg()
+// if msgS != nil {
+// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+// ms.printLog("Recv Sub Message:", string(msgS.Body))
+// ms.SubCh <- msgS
+// }
+// if msgR != nil {
+// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+// go ms.serve(msgR, keyR)
+// }
+//
+// time.Sleep(50 * time.Millisecond)
+// }
+// }
+//}
- if topicName == "" {
- return nil,errors.New("Servicename 涓嶈兘涓虹┖")
- }
+func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
+ t := time.Now()
+
ms.printLog("1:", time.Since(t))
t = time.Now()
rb, _ := json.Marshal(request)
@@ -129,62 +154,36 @@
Body: rb,
}
ms.printLog("2:", time.Since(t))
- t = time.Now()
- mi,err := ms.handle.Request(serverId, msgR, 5000)
- if mi == nil || err != nil {
- return nil, err
- }
- ms.printLog("3:", time.Since(t))
- t = time.Now()
- ri := new(Reply)
- err = json.Unmarshal(mi.Body, ri)
- if err != nil {
- ms.printLog("unmarshal mi.Body err:", err)
- ri = &Reply{
- Success: false,
- Msg: "鏈嶅姟璇锋眰澶辫触",
- Data: "鏈嶅姟璇锋眰澶辫触",
- }
- }
- ms.printLog("4:", time.Since(t))
- return ri, nil
+ return ms.handle.Request(serverId, msgR, milliSecs)
}
-func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
+func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
rb, _ := json.Marshal(request)
msgR := &MsgInfo{
Topic: request.Path,
Body: rb,
}
- mi, err := ms.handle.Request(serverId, msgR, 5000)
- if err != nil {
- return nil, err
- }
- var ri *Reply
- err = json.Unmarshal(mi.Body, ri)
- if err != nil {
- ri = &Reply{
- Success: false,
- Msg: "鏈嶅姟璇锋眰澶辫触",
- Data: "鏈嶅姟璇锋眰澶辫触",
- }
- }
- return ri, nil
+ return ms.handle.Request(serverId, msgR, milliSecs)
+}
+
+func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
+ return ms.handle.RequestOnly(rData, nodes)
}
//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
- netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
if err != nil {
+ ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
return nil
}
return netNodes
}
//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
- netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
if err != nil {
return nil
}
@@ -199,55 +198,115 @@
}
cr, err := ms.handle.RequestCenter(&r)
if err != nil {
+ ms.printLog("requestCenter reply:", cr, "err:", err)
return nil, err
}
- if cr.Status == REPLY_SUCCESS && cr.Body != nil {
- var list []RegisteredClient
- err = json.Unmarshal(cr.Body, &list)
+ if cr.Success {
+ rd,err := json.Marshal(cr.Data)
if err == nil {
- return list, nil
+ var list []RegisteredClient
+ err = json.Unmarshal(rd, &list)
+ if err == nil {
+ return list, nil
+ } else {
+ ms.printLog("unmarshal to RegisteredClient list err:", err)
+ }
} else {
- ms.printLog("unmarshal to RegisteredClient list err:", err)
+ return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
}
} else {
- ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc)
+ ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data)
}
return nil, fmt.Errorf("GetRegisteredClient list failed")
}
+//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+// ri := &Reply{}
+// if ms.handlers == nil {
+// ri.Msg = "send wrong addr, check yourself!!!"
+// } else {
+// var msgR MsgInfo
+// err := json.Unmarshal(rdata, &msgR)
+// if err != nil {
+// ri.Msg = err.Error()
+// } else {
+// var reqBody Request
+// err = json.Unmarshal(rdata, &msgR.Body)
+// if err != nil {
+// ri.Msg = err.Error()
+// } else {
+// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+// if f,ok := ms.handlers[reqBody.Path];ok {
+// reqBody.SrcProc = msgR.SrcProc
+// ri = f(&reqBody)
+// ms.printLog("call funcMap f,reply:", *ri)
+// } else {
+// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+// }
+// }
+// }
+// }
+// result, err := json.Marshal(*ri)
+// if err != nil {
+// sdata = nil
+// } else {
+// sdata = &result
+// }
+// return ri.Success
+//}
+
+//func (ms *MicroNode) serve() {
+// if ms.handlers == nil {
+// return
+// }
+// for i:=0;i<10;i++ {
+// ms.handle.wg.Add(1)
+// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
+// }
+//}
+
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+ if ms.handlers == nil {
+ return
+ }
+
var reqBody Request
+ var ri *Reply
err := json.Unmarshal(msgR.Body, &reqBody)
if err != nil {
ms.printLog("serve unmarshal msgR.Body err:", err)
- }
-
- ms.printLog("reqBody:", reqBody)
- var ri *Reply
- if f,ok := ms.handlers[reqBody.Path];ok {
- ri = f(&reqBody)
- ms.printLog("call funcMap f,reply:", *ri)
+ ri = &Reply {
+ Msg: err.Error(),
+ }
} else {
- ms.printLog("ms.funcMap not eixst path")
- ri = &Reply{
- Success: false,
- Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
- Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+
+ if f,ok := ms.handlers[reqBody.Path];ok {
+ reqBody.SrcProc = msgR.SrcProc
+ ri = f(&reqBody)
+ ms.printLog("call funcMap f,reply:", *ri)
+ } else {
+ ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+ ri = &Reply{
+ Success: false,
+ Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ }
}
}
- rd,err := json.Marshal(*ri)
- if err != nil {
- ms.printLog("marshal *ri err:", err)
+
+ retErr := ms.handle.Reply(p, ri)
+ if retErr != nil {
+ ms.printLog("retErr:", retErr)
}
- rMsg := MsgInfo{
- Body: rd,
- }
- ms.handle.Reply(p, rMsg)
}
//鍙戝竷鍒版湰鏈�
func (ms *MicroNode) Publish(topic string,msg []byte) error {
- nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
+ nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
+ Key: 8,
+ })
return ms.PublishNet(nodes, topic, msg)
}
@@ -259,8 +318,54 @@
return ms.handle.Pub(nodes, pi)
}
+func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
+ pi := &MsgInfo{
+ Topic: topic,
+ Body: msg,
+ }
+ return ms.handle.PubTimeout(nodes, pi, timeout)
+}
+
+//璁㈤槄涓婚
func (ms *MicroNode) Subscribe(topics []string) {
ms.handle.Sub(topics)
+ for _,t := range topics {
+ if ms.reg.SubTopic == nil {
+ ms.reg.SubTopic = make([]string, 0)
+ }
+ found := false
+ for _,it := range ms.reg.SubTopic {
+ if it == t {
+ found = true
+ break
+ }
+ }
+ if !found {
+ ms.reg.SubTopic = append(ms.reg.SubTopic, t)
+ }
+ }
+}
+
+//鍙栨秷璁㈤槄鐨勪富棰�
+func (ms *MicroNode) DeSub(topics []string) {
+ ms.printLog("DeSub topics:", topics)
+ ms.handle.DeSub(topics)
+ if ms.reg.SubTopic != nil {
+ var leftTopics []string
+ for _,t := range ms.reg.SubTopic {
+ found := false
+ for _,it := range topics {
+ if it == t {
+ found = true
+ break
+ }
+ }
+ if !found {
+ leftTopics = append(leftTopics, t)
+ }
+ }
+ ms.reg.SubTopic = leftTopics
+ }
}
//free handle
--
Gitblit v1.8.0