From c3003a24e2cf4ef49db38b8b392bc7a788554fff Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期五, 15 一月 2021 15:28:58 +0800
Subject: [PATCH] recvRoutine print n

---
 micronode.go |  125 +++++++++++++++++++++++++++++++----------
 1 files changed, 94 insertions(+), 31 deletions(-)

diff --git a/micronode.go b/micronode.go
index eafe20b..93476de 100644
--- a/micronode.go
+++ b/micronode.go
@@ -1,4 +1,4 @@
-package mc
+package bhomeclient
 
 import (
 	"basic.com/valib/bhomebus.git"
@@ -19,25 +19,23 @@
 	serverId 	string
 	fnLog 		func(...interface{})
 
-	SubChM 		map[string]chan *MsgInfo //浠ヨ闃呯殑涓婚涓簁ey
+	SubCh 		chan *MsgInfo
 }
 
-func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
-	conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog)
+func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){
+	conf := NewConfig(KEY_REGISTER,512,5,1000,100,1000, fnLog)
 	handle, err := Register(ctx, q, conf, reg)
 	if err != nil {
 		return nil, err
 	}
 	mn := &MicroNode {
+		ctx: ctx,
 		serverId: serverId,
 		handle:   handle,
 		reg:      reg,
-		procInfo: procInfo,
-		fnLog: fnLog,
-		SubChM:   make(map[string]chan *MsgInfo),
-	}
-	for _,subTopic := range reg.SubTopic {
-		mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
+		procInfo: &reg.Proc,
+		fnLog:    fnLog,
+		SubCh:    make(chan *MsgInfo, 512),
 	}
 
 	return mn, nil
@@ -71,7 +69,7 @@
 		Proc:    *ms.procInfo,
 	}
 
-	t := time.NewTicker(time.Second)
+	t := time.NewTicker(4 * time.Second)
 	defer t.Stop()
 
 	for {
@@ -102,9 +100,7 @@
 			if msgS != nil {
 				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
 				ms.printLog("Recv Sub Message:", string(msgS.Body))
-				if ch,ok := ms.SubChM[msgS.Topic];ok {
-					ch <- msgS
-				}
+				ms.SubCh <- msgS
 			}
 			if msgR != nil {
 				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
@@ -114,13 +110,9 @@
 	}
 }
 
-func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
+func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
 	t := time.Now()
-	topicName := request.Header("Servicename")
 
-	if topicName == "" {
-		return nil,errors.New("Servicename 涓嶈兘涓虹┖")
-	}
 	ms.printLog("1:", time.Since(t))
 	t = time.Now()
 	rb, _ := json.Marshal(request)
@@ -130,7 +122,7 @@
 	}
 	ms.printLog("2:", time.Since(t))
 	t = time.Now()
-	mi,err := ms.handle.Request(serverId, msgR, 5000)
+	mi,err := ms.handle.Request(serverId, msgR, milliSecs)
 	if mi == nil || err != nil {
 		return nil, err
 	}
@@ -150,14 +142,14 @@
 	return ri, nil
 }
 
-func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
+func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
 	rb, _ := json.Marshal(request)
 	msgR := &MsgInfo{
 		Topic: request.Path,
 		Body: rb,
 	}
 
-	mi, err := ms.handle.Request(serverId, msgR, 5000)
+	mi, err := ms.handle.Request(serverId, msgR, milliSecs)
 	if err != nil {
 		return nil, err
 	}
@@ -174,31 +166,62 @@
 }
 
 //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key  锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
-	netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+	netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
 	if err != nil {
+		ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
 		return nil
 	}
 	return netNodes
 }
 
 //鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛�   锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
-	netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+	netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
 	if err != nil {
 		return nil
 	}
 	return netNodes
 }
 
+func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
+	r := MsgInfo{
+		SrcProc: *ms.procInfo,
+		MsgType: MesgType_ReqRep,
+		Topic: TOPIC_QUERYPROC,
+	}
+	cr, err := ms.handle.RequestCenter(&r)
+	if err != nil {
+		ms.printLog("requestCenter reply:", cr, "err:", err)
+		return nil, err
+	}
+	if cr.Status == REPLY_SUCCESS && cr.Body != nil {
+		var list []RegisteredClient
+		err = json.Unmarshal(cr.Body, &list)
+		if err == nil {
+			return list, nil
+		} else {
+			ms.printLog("unmarshal to RegisteredClient list err:", err)
+		}
+	} else {
+		ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc)
+	}
+	return nil, fmt.Errorf("GetRegisteredClient list failed")
+}
+
 func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+	if ms.handlers == nil {
+		return
+	}
+
 	var reqBody Request
 	err := json.Unmarshal(msgR.Body, &reqBody)
 	if err != nil {
 		ms.printLog("serve unmarshal msgR.Body err:", err)
+		return
 	}
 
-	ms.printLog("reqBody:", reqBody)
+	ms.printLog("reqBody:", reqBody, "to key: ", p)
 	var ri *Reply
 	if f,ok := ms.handlers[reqBody.Path];ok {
 		ri = f(&reqBody)
@@ -218,7 +241,10 @@
 	rMsg := MsgInfo{
 		Body: rd,
 	}
-	ms.handle.Reply(p, rMsg)
+	retErr := ms.handle.Reply(p, rMsg)
+	if retErr != nil {
+		ms.printLog("retErr:", retErr)
+	}
 }
 
 //鍙戝竷鍒版湰鏈�
@@ -235,9 +261,46 @@
 	return ms.handle.Pub(nodes, pi)
 }
 
-func (ms *MicroNode) Subscribe(topics []string) chan []byte {
-	ch := make(chan []byte)
-	return ch
+//璁㈤槄涓婚
+func (ms *MicroNode) Subscribe(topics []string) {
+	ms.handle.Sub(topics)
+	for _,t := range topics {
+		if ms.reg.SubTopic == nil {
+			ms.reg.SubTopic = make([]string, 0)
+		}
+		found := false
+		for _,it := range ms.reg.SubTopic {
+			if it == t {
+				found = true
+				break
+			}
+		}
+		if !found {
+			ms.reg.SubTopic = append(ms.reg.SubTopic, t)
+		}
+	}
+}
+
+//鍙栨秷璁㈤槄鐨勪富棰�
+func (ms *MicroNode) DeSub(topics []string) {
+	ms.printLog("DeSub topics:", topics)
+	ms.handle.DeSub(topics)
+	if ms.reg.SubTopic != nil {
+		var leftTopics []string
+		for _,t := range ms.reg.SubTopic {
+			found := false
+			for _,it := range topics {
+				if it == t {
+					found = true
+					break
+				}
+			}
+			if !found {
+				leftTopics = append(leftTopics, t)
+			}
+		}
+		ms.reg.SubTopic = leftTopics
+	}
 }
 
 //free handle

--
Gitblit v1.8.0