From 81afb6ffbf7f76f49644a1832dcfe241552d7e08 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期五, 05 二月 2021 18:20:09 +0800
Subject: [PATCH] add recvandsend

---
 micronode.go |  168 ++++++++++++++++++++++++++++----------------------------
 1 files changed, 84 insertions(+), 84 deletions(-)

diff --git a/micronode.go b/micronode.go
index 88f9bfd..9fa49a2 100644
--- a/micronode.go
+++ b/micronode.go
@@ -22,8 +22,8 @@
 	SubCh 		chan *MsgInfo
 }
 
-func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
-	conf := NewConfig(KEY_REGISTER,512,5,100,100,100, fnLog)
+func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
+	conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog)
 	handle, err := Register(ctx, q, conf, reg)
 	if err != nil {
 		return nil, err
@@ -33,7 +33,7 @@
 		serverId: serverId,
 		handle:   handle,
 		reg:      reg,
-		procInfo: procInfo,
+		procInfo: &reg.Proc,
 		fnLog:    fnLog,
 		SubCh:    make(chan *MsgInfo, 512),
 	}
@@ -69,7 +69,7 @@
 		Proc:    *ms.procInfo,
 	}
 
-	t := time.NewTicker(time.Second)
+	t := time.NewTicker(1 * time.Second)
 	defer t.Stop()
 
 	for {
@@ -90,33 +90,34 @@
 	ms.handlers = funcMap
 
 	go ms.startHeartbeat()
+	//鎺ユ敹璁㈤槄鍒扮殑娑堟伅
+	go ms.startRecvSubMsg()
+	//浣滀负server鍚姩
+	ms.serve()
+}
 
+//寮�濮嬫帴鏀惰闃呮秷鎭�
+func (ms *MicroNode) startRecvSubMsg() {
 	for {
 		select {
 		case <- ms.ctx.Done():
 			return
 		default:
-			msgS, msgR, keyR := ms.handle.GetMsg()
+			msgS := ms.handle.GetMsg()
 			if msgS != nil {
 				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
 				ms.printLog("Recv Sub Message:", string(msgS.Body))
 				ms.SubCh <- msgS
 			}
-			if msgR != nil {
-				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-				go ms.serve(msgR, keyR)
-			}
+
+			time.Sleep(50 * time.Millisecond)
 		}
 	}
 }
 
-func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
+func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
 	t := time.Now()
-	topicName := request.Header("Servicename")
 
-	if topicName == "" {
-		return nil,errors.New("Servicename 涓嶈兘涓虹┖")
-	}
 	ms.printLog("1:", time.Since(t))
 	t = time.Now()
 	rb, _ := json.Marshal(request)
@@ -125,48 +126,21 @@
 		Body: rb,
 	}
 	ms.printLog("2:", time.Since(t))
-	t = time.Now()
-	mi,err := ms.handle.Request(serverId, msgR, 5000)
-	if mi == nil || err != nil {
-		return nil, err
-	}
-	ms.printLog("3:", time.Since(t))
-	t = time.Now()
-	ri := new(Reply)
-	err = json.Unmarshal(mi.Body, ri)
-	if err != nil {
-		ms.printLog("unmarshal mi.Body err:", err)
-		ri = &Reply{
-			Success: false,
-			Msg: "鏈嶅姟璇锋眰澶辫触",
-			Data: "鏈嶅姟璇锋眰澶辫触",
-		}
-	}
-	ms.printLog("4:", time.Since(t))
-	return ri, nil
+	return ms.handle.Request(serverId, msgR, milliSecs)
 }
 
-func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
+func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
 	rb, _ := json.Marshal(request)
 	msgR := &MsgInfo{
 		Topic: request.Path,
 		Body: rb,
 	}
 
-	mi, err := ms.handle.Request(serverId, msgR, 5000)
-	if err != nil {
-		return nil, err
-	}
-	var ri *Reply
-	err = json.Unmarshal(mi.Body, ri)
-	if err != nil {
-		ri = &Reply{
-			Success: false,
-			Msg: "鏈嶅姟璇锋眰澶辫触",
-			Data: "鏈嶅姟璇锋眰澶辫触",
-		}
-	}
-	return ri, nil
+	return ms.handle.Request(serverId, msgR, milliSecs)
+}
+
+func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
+	return ms.handle.RequestOnly(rData, nodes)
 }
 
 //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key  锛堢粨鏋滃彧鏈変竴涓厓绱狅級
@@ -199,58 +173,76 @@
 		ms.printLog("requestCenter reply:", cr, "err:", err)
 		return nil, err
 	}
-	if cr.Status == REPLY_SUCCESS && cr.Body != nil {
-		var list []RegisteredClient
-		err = json.Unmarshal(cr.Body, &list)
+	if cr.Success {
+		rd,err := json.Marshal(cr.Data)
 		if err == nil {
-			return list, nil
+			var list []RegisteredClient
+			err = json.Unmarshal(rd, &list)
+			if err == nil {
+				return list, nil
+			} else {
+				ms.printLog("unmarshal to RegisteredClient list err:", err)
+			}
 		} else {
-			ms.printLog("unmarshal to RegisteredClient list err:", err)
+			return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
 		}
 	} else {
-		ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc)
+		ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data)
 	}
 	return nil, fmt.Errorf("GetRegisteredClient list failed")
 }
 
-func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
+	ri := &Reply{}
+	if ms.handlers == nil {
+		ri.Msg = "send wrong addr, check yourself!!!"
+	} else {
+		var msgR MsgInfo
+		err := json.Unmarshal(rdata, &msgR)
+		if err != nil {
+			ri.Msg = err.Error()
+		} else {
+			var reqBody Request
+			err = json.Unmarshal(rdata, &msgR.Body)
+			if err != nil {
+				ri.Msg = err.Error()
+			} else {
+				ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
+				if f,ok := ms.handlers[reqBody.Path];ok {
+					reqBody.SrcProc = msgR.SrcProc
+					ri = f(&reqBody)
+					ms.printLog("call funcMap f,reply:", *ri)
+				} else {
+					ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
+					ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
+				}
+			}
+		}
+	}
+	result, err := json.Marshal(*ri)
+	if err != nil {
+		sdata = nil
+	} else {
+		sdata = &result
+	}
+	return ri.Success
+}
+
+func (ms *MicroNode) serve() {
 	if ms.handlers == nil {
 		return
 	}
-
-	var reqBody Request
-	err := json.Unmarshal(msgR.Body, &reqBody)
-	if err != nil {
-		ms.printLog("serve unmarshal msgR.Body err:", err)
-		return
+	for i:=0;i<10;i++ {
+		ms.handle.wg.Add(1)
+		go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
 	}
-
-	ms.printLog("reqBody:", reqBody)
-	var ri *Reply
-	if f,ok := ms.handlers[reqBody.Path];ok {
-		ri = f(&reqBody)
-		ms.printLog("call funcMap f,reply:", *ri)
-	} else {
-		ms.printLog("ms.funcMap not eixst path")
-		ri = &Reply{
-			Success: false,
-			Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
-			Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
-		}
-	}
-	rd,err := json.Marshal(*ri)
-	if err != nil {
-		ms.printLog("marshal *ri err:", err)
-	}
-	rMsg := MsgInfo{
-		Body: rd,
-	}
-	ms.handle.Reply(p, rMsg)
 }
 
 //鍙戝竷鍒版湰鏈�
 func (ms *MicroNode) Publish(topic string,msg []byte) error {
-	nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
+	nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
+		Key: 8,
+	})
 	return ms.PublishNet(nodes, topic, msg)
 }
 
@@ -262,6 +254,14 @@
 	return ms.handle.Pub(nodes, pi)
 }
 
+func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
+	pi := &MsgInfo{
+		Topic: topic,
+		Body: msg,
+	}
+	return ms.handle.PubTimeout(nodes, pi, timeout)
+}
+
 //璁㈤槄涓婚
 func (ms *MicroNode) Subscribe(topics []string) {
 	ms.handle.Sub(topics)

--
Gitblit v1.8.0