From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期日, 25 四月 2021 11:36:03 +0800
Subject: [PATCH] 使用bhsgo by lichao

---
 micronode.go |  175 +++++++++++++++-------------------------------------------
 1 files changed, 45 insertions(+), 130 deletions(-)

diff --git a/micronode.go b/micronode.go
index 89e4088..a826412 100644
--- a/micronode.go
+++ b/micronode.go
@@ -1,7 +1,7 @@
 package bhomeclient
 
 import (
-	"basic.com/valib/bhomebus.git"
+	"basic.com/valib/bhshmq.git/proto/source/bhome_msg"
 	"context"
 	"encoding/json"
 	"errors"
@@ -20,7 +20,7 @@
 	serverId 	string
 	fnLog 		func(...interface{})
 
-	SubCh 		chan *MsgInfo
+	SubCh 		chan *bhome_msg.MsgPublish
 
 	mtx 		sync.Mutex
 	started 	bool
@@ -39,7 +39,7 @@
 		reg:      reg,
 		procInfo: &reg.Proc,
 		fnLog:    fnLog,
-		SubCh:    make(chan *MsgInfo, 512),
+		SubCh:    make(chan *bhome_msg.MsgPublish, 512),
 	}
 
 	return mn, nil
@@ -65,14 +65,6 @@
 }
 
 func (ms *MicroNode) startHeartbeat() {
-	hbi := &HeartBeatInfo{
-		HealthLevel: "health",
-		Fps:         12,
-		WarnInfo:    "warn",
-		ErrorInfo:   "error",
-		Proc:    *ms.procInfo,
-	}
-
 	t := time.NewTicker(1 * time.Second)
 	defer t.Stop()
 
@@ -81,7 +73,9 @@
 		case <-ms.ctx.Done():
 			return
 		case <-t.C:
-			ms.handle.HeartBeat(hbi)
+			ms.handle.HeartBeat()
+		default:
+			time.Sleep(500 * time.Millisecond)
 		}
 	}
 }
@@ -110,52 +104,18 @@
 			select {
 			case <- ms.ctx.Done():
 				return
+			case msgR := <-ms.handle.ChReply: //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+				go ms.serve(ms.handle.ctx, &msgR)
+			case msgS := <-ms.handle.ChSub:
+				ms.printLog("Recv Sub Message:", string(msgS.Data))
+				ms.SubCh <- &msgS
 			default:
-				msgS, msgR, keyR := ms.handle.GetMsg()
-				if msgS != nil {
-					//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
-					ms.printLog("Recv Sub Message:", string(msgS.Body))
-					ms.SubCh <- msgS
-				}
-				if msgR != nil {
-					//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-					go ms.serve(msgR, keyR)
-				}
-
 				time.Sleep(50 * time.Millisecond)
 			}
 		}
-
-		//鎺ユ敹璁㈤槄鍒扮殑娑堟伅
-		//go ms.startRecvSubMsg()
-		//浣滀负server鍚姩
-		//ms.serve()
 	}
 	ms.mtx.Unlock()
 }
-
-//寮�濮嬫帴鏀惰闃呮秷鎭�
-//func (ms *MicroNode) startRecvSubMsg() {
-//	for {
-//		select {
-//		case <- ms.ctx.Done():
-//			return
-//		default:
-//			msgS, msgR, keyR := ms.handle.GetMsg()
-//			if msgS != nil {
-//				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
-//				ms.printLog("Recv Sub Message:", string(msgS.Body))
-//				ms.SubCh <- msgS
-//			}
-//			if msgR != nil {
-//				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
-//				go ms.serve(msgR, keyR)
-//			}
-//
-//			time.Sleep(50 * time.Millisecond)
-//		}
-//	}
-//}
 
 func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
 	t := time.Now()
@@ -163,9 +123,9 @@
 	ms.printLog("1:", time.Since(t))
 	t = time.Now()
 	rb, _ := json.Marshal(request)
-	msgR := &MsgInfo {
-		Topic: request.Path,
-		Body: rb,
+	msgR := &bhome_msg.MsgRequestTopic{
+		Topic: []byte(request.Path),
+		Data: rb,
 	}
 	ms.printLog("2:", time.Since(t))
 	return ms.handle.Request(serverId, msgR, milliSecs)
@@ -173,20 +133,20 @@
 
 func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
 	rb, _ := json.Marshal(request)
-	msgR := &MsgInfo{
-		Topic: request.Path,
-		Body: rb,
+	msgR := &bhome_msg.MsgRequestTopic{
+		Topic: []byte(request.Path),
+		Data: rb,
 	}
 
 	return ms.handle.Request(serverId, msgR, milliSecs)
 }
 
-func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
-	return ms.handle.RequestOnly(rData, nodes)
+func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) {
+	return ms.handle.RequestOnly(req, dest)
 }
 
 //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key  锛堢粨鏋滃彧鏈変竴涓厓绱狅級
-func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
+func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress {
 	netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
 	if err != nil {
 		ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
@@ -196,7 +156,7 @@
 }
 
 //鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛�   锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
-func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
+func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress {
 	netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
 	if err != nil {
 		return nil
@@ -205,11 +165,7 @@
 }
 
 func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
-	r := MsgInfo{
-		SrcProc: *ms.procInfo,
-		MsgType: MesgType_ReqRep,
-		Topic: TOPIC_QUERYPROC,
-	}
+	r := bhome_msg.MsgRequestTopic{}
 	cr, err := ms.handle.RequestCenter(&r)
 	if err != nil {
 		ms.printLog("requestCenter reply:", cr, "err:", err)
@@ -234,76 +190,37 @@
 	return nil, fmt.Errorf("GetRegisteredClient list failed")
 }
 
-//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
-//	ri := &Reply{}
-//	if ms.handlers == nil {
-//		ri.Msg = "send wrong addr, check yourself!!!"
-//	} else {
-//		var msgR MsgInfo
-//		err := json.Unmarshal(rdata, &msgR)
-//		if err != nil {
-//			ri.Msg = err.Error()
-//		} else {
-//			var reqBody Request
-//			err = json.Unmarshal(rdata, &msgR.Body)
-//			if err != nil {
-//				ri.Msg = err.Error()
-//			} else {
-//				ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
-//				if f,ok := ms.handlers[reqBody.Path];ok {
-//					reqBody.SrcProc = msgR.SrcProc
-//					ri = f(&reqBody)
-//					ms.printLog("call funcMap f,reply:", *ri)
-//				} else {
-//					ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
-//					ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl"
-//				}
-//			}
-//		}
-//	}
-//	result, err := json.Marshal(*ri)
-//	if err != nil {
-//		sdata = nil
-//	} else {
-//		sdata = &result
-//	}
-//	return ri.Success
-//}
-
-//func (ms *MicroNode) serve() {
-//	if ms.handlers == nil {
-//		return
-//	}
-//	for i:=0;i<10;i++ {
-//		ms.handle.wg.Add(1)
-//		go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
-//	}
-//}
-
-func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
+func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) {
 	if ms.handlers == nil {
 		return
 	}
 
 	var reqBody Request
 	var ri *Reply
-	err := json.Unmarshal(msgR.Body, &reqBody)
+	err := json.Unmarshal(msgR.Data, &reqBody)
 	if err != nil {
 		ms.printLog("serve unmarshal msgR.Body err:", err)
 		ri = &Reply {
 			Msg: err.Error(),
 		}
 	} else {
-		ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
+		ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
 
 		if f,ok := ms.handlers[reqBody.Path];ok {
-			reqBody.SrcProc = msgR.SrcProc
+			reqBody.SrcProc = ProcInfo{
+				ID: msgR.ProcId,
+			}
 			h := WrapperHandler{
 				ms,
 				ms,
 			}
-			ri = f(&h, &reqBody)
-			ms.printLog("call funcMap f,reply.Success:", ri.Success)
+			select {
+			case <-ctx.Done():
+				ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!")
+			default:
+				ri = f(&h, &reqBody)
+				ms.printLog("call funcMap f,reply.Success:", ri.Success)
+			}
 		} else {
 			ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
 			ri = &Reply{
@@ -314,7 +231,7 @@
 		}
 	}
 
-	retErr := ms.handle.Reply(p, ri)
+	retErr := ms.handle.Reply(msgR.Src, ri)
 	if retErr != nil {
 		ms.printLog("retErr:", retErr)
 	}
@@ -322,24 +239,22 @@
 
 //鍙戝竷鍒版湰鏈�
 func (ms *MicroNode) Publish(topic string,msg []byte) error {
-	nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
-		Key: 8,
-	})
+	var nodes []bhome_msg.BHAddress
 	return ms.PublishNet(nodes, topic, msg)
 }
 
-func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
-	pi := &MsgInfo{
-		Topic: topic,
-		Body: msg,
+func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
+	pi := &bhome_msg.MsgPublish{
+		Topic: []byte(topic),
+		Data: data,
 	}
 	return ms.handle.Pub(nodes, pi)
 }
 
-func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
-	pi := &MsgInfo{
-		Topic: topic,
-		Body: msg,
+func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
+	pi := &bhome_msg.MsgPublish{
+		Topic: []byte(topic),
+		Data: data,
 	}
 	return ms.handle.PubTimeout(nodes, pi, timeout)
 }

--
Gitblit v1.8.0