From 44ed992c5d12b985c474c877a68439d5e1d77e3a Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期五, 08 一月 2021 11:05:55 +0800
Subject: [PATCH] add log
---
micronode.go | 98 ++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 81 insertions(+), 17 deletions(-)
diff --git a/micronode.go b/micronode.go
index eafe20b..cceea93 100644
--- a/micronode.go
+++ b/micronode.go
@@ -1,4 +1,4 @@
-package mc
+package bhomeclient
import (
"basic.com/valib/bhomebus.git"
@@ -19,7 +19,7 @@
serverId string
fnLog func(...interface{})
- SubChM map[string]chan *MsgInfo //浠ヨ闃呯殑涓婚涓簁ey
+ SubCh chan *MsgInfo
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
@@ -29,15 +29,13 @@
return nil, err
}
mn := &MicroNode {
+ ctx: ctx,
serverId: serverId,
handle: handle,
reg: reg,
procInfo: procInfo,
- fnLog: fnLog,
- SubChM: make(map[string]chan *MsgInfo),
- }
- for _,subTopic := range reg.SubTopic {
- mn.SubChM[subTopic] = make(chan *MsgInfo, 512)
+ fnLog: fnLog,
+ SubCh: make(chan *MsgInfo, 512),
}
return mn, nil
@@ -102,9 +100,7 @@
if msgS != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
ms.printLog("Recv Sub Message:", string(msgS.Body))
- if ch,ok := ms.SubChM[msgS.Topic];ok {
- ch <- msgS
- }
+ ms.SubCh <- msgS
}
if msgR != nil {
//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
@@ -174,28 +170,59 @@
}
//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
- netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
if err != nil {
+ ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
return nil
}
return netNodes
}
//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
- netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
if err != nil {
return nil
}
return netNodes
}
+func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
+ r := MsgInfo{
+ SrcProc: *ms.procInfo,
+ MsgType: MesgType_ReqRep,
+ Topic: TOPIC_QUERYPROC,
+ }
+ cr, err := ms.handle.RequestCenter(&r)
+ if err != nil {
+ ms.printLog("requestCenter reply:", cr, "err:", err)
+ return nil, err
+ }
+ if cr.Status == REPLY_SUCCESS && cr.Body != nil {
+ var list []RegisteredClient
+ err = json.Unmarshal(cr.Body, &list)
+ if err == nil {
+ return list, nil
+ } else {
+ ms.printLog("unmarshal to RegisteredClient list err:", err)
+ }
+ } else {
+ ms.printLog("request center failed,status:", cr.Status, "desc:", cr.Desc)
+ }
+ return nil, fmt.Errorf("GetRegisteredClient list failed")
+}
+
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+ if ms.handlers == nil {
+ return
+ }
+
var reqBody Request
err := json.Unmarshal(msgR.Body, &reqBody)
if err != nil {
ms.printLog("serve unmarshal msgR.Body err:", err)
+ return
}
ms.printLog("reqBody:", reqBody)
@@ -235,9 +262,46 @@
return ms.handle.Pub(nodes, pi)
}
-func (ms *MicroNode) Subscribe(topics []string) chan []byte {
- ch := make(chan []byte)
- return ch
+//璁㈤槄涓婚
+func (ms *MicroNode) Subscribe(topics []string) {
+ ms.handle.Sub(topics)
+ for _,t := range topics {
+ if ms.reg.SubTopic == nil {
+ ms.reg.SubTopic = make([]string, 0)
+ }
+ found := false
+ for _,it := range ms.reg.SubTopic {
+ if it == t {
+ found = true
+ break
+ }
+ }
+ if !found {
+ ms.reg.SubTopic = append(ms.reg.SubTopic, t)
+ }
+ }
+}
+
+//鍙栨秷璁㈤槄鐨勪富棰�
+func (ms *MicroNode) DeSub(topics []string) {
+ ms.printLog("DeSub topics:", topics)
+ ms.handle.DeSub(topics)
+ if ms.reg.SubTopic != nil {
+ var leftTopics []string
+ for _,t := range ms.reg.SubTopic {
+ found := false
+ for _,it := range topics {
+ if it == t {
+ found = true
+ break
+ }
+ }
+ if !found {
+ leftTopics = append(leftTopics, t)
+ }
+ }
+ ms.reg.SubTopic = leftTopics
+ }
}
//free handle
--
Gitblit v1.8.0