From c1b804b45eaa3b5c325a36ab7fcdb6b09b8cdfd4 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 29 十二月 2020 10:15:10 +0800
Subject: [PATCH] 集成MicroNode

---
 mc/micronode.go    |  249 +++++++++++++++++++++++++++++++++++
 mc/broker.go       |   13 +
 mc/constants.go    |   51 +++++++
 mc/param.go        |   70 ++++++++++
 mc/requestTopic.go |    8 +
 5 files changed, 391 insertions(+), 0 deletions(-)

diff --git a/mc/broker.go b/mc/broker.go
new file mode 100644
index 0000000..1962e27
--- /dev/null
+++ b/mc/broker.go
@@ -0,0 +1,13 @@
+package mc
+
+import "basic.com/valib/bhomebus.git"
+
+type Broker interface {
+	//鍙戝竷鍒版湰鏈�
+	Publish(topic string, msg []byte) error
+
+	//鍙戝竷鍒拌繙绋嬫満鍣�
+	PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
+
+	Subscribe(topics []string) chan []byte
+}
diff --git a/mc/constants.go b/mc/constants.go
new file mode 100644
index 0000000..aac9a4c
--- /dev/null
+++ b/mc/constants.go
@@ -0,0 +1,51 @@
+package mc
+
+const (
+	Default_Layer = -9999 //鎽勫儚鏈烘坊鍔犺繘鏉ョ殑榛樿妤煎眰
+	TYPE_LOCAL_CAMERA = 0 //鏈湴鎽勫儚鏈�
+	TYPE_GB28181_CAMERA = 1 //鍥芥爣鎽勫儚鏈�
+
+	TYPE_RUNTYPE_VIDEO = -1 //鍗曠函鐨勭洃鎺э紝涓嶅仛鍒嗘瀽
+	TYPE_RUNTYPE_POLL = 0 //杞鍋氫换鍔�
+	TYPE_RUNTYPE_REALTIME = 1 //瀹炴椂鍋氫换鍔�
+
+	Camera_Status_NoRule = 0  //鏈厤瑙勫垯
+	Camera_Status_Wait = 1  //绛夊緟澶勭悊
+	Camera_Status_Doing = 2  //澶勭悊涓�
+	Camera_Status_Other = -1 //鍏朵粬鎯呭喌
+
+	Id_Stack_Pre = "stack_"
+	Stack_Status_NoRule = 0  //鏈厤瑙勫垯
+	Stack_Status_Wait = 1  //绛夊緟澶勭悊
+	Stack_Status_Doing = 2  //澶勭悊涓�
+	Stack_Status_Done = 9  //澶勭悊瀹屾垚
+
+
+	File_Type_Img = 0
+	File_Type_Video = 1
+	File_Type_Audio = 2
+	File_Img_Id_Pre = "img_"
+	File_Video_Id_Pre = "video_"
+	File_Audio_Id_Pre = "audio_"
+	File_Other_Id_Pre = "other_"
+	File_Status_Delete = -1  			//鍒犻櫎鐘舵��
+	File_Status_Pause = 0   			//鏆傚仠鐘舵��
+	File_Status_Wait = 1   				//绛夊緟澶勭悊
+	File_Status_Doing = 2				//澶勭悊涓�
+	File_Status_Complete = 9			//宸插畬鎴�
+	File_Status_DealErr = -2			//澶勭悊澶辫触
+
+
+	Proc_Api_Gateway = "api-gateway"
+	Proc_Camera_Service = "camera-service"
+	Proc_App_Center = "appcenter-service"
+	Proc_Chanmanage_Service = "chanmanage-service"
+	Proc_CompTable_Service = "compTable-service"
+	Proc_Gb28181_Service = "gb28181-service"
+	Proc_Push_Service = "push-service"   //
+	Proc_Realtime_Service = "realtime-service" //瀹炴椂鐩戞帶
+	Proc_Scene_Service = "scene-service"   //鍦烘櫙閰嶇疆杩涚▼
+	Proc_Search_Service = "search-service" //鏌ヨ妫�绱㈣繘绋�
+	Proc_Stack_Service = "stack-service"   //鏁版嵁鏍堣繘绋�
+	Proc_System_Service = "system-service" //绯荤粺鏈嶅姟
+)
diff --git a/mc/micronode.go b/mc/micronode.go
new file mode 100644
index 0000000..95d28cf
--- /dev/null
+++ b/mc/micronode.go
@@ -0,0 +1,249 @@
+package mc
+
+import (
+	"basic.com/valib/bhomebus.git"
+	"bhomeclient"
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"os"
+	"time"
+)
+
+type MicroNode struct {
+	ctx 		context.Context
+	handle 		*bhomeclient.BHBus
+	reg 		*bhomeclient.RegisterInfo
+	procInfo 	*bhomeclient.ProcInfo
+	handlers 	map[string]MicroFunc
+	serverId 	string
+	fnLog 		func(...interface{})
+
+	SubChM 		map[string]chan *bhomeclient.MsgInfo //浠ヨ闃呯殑涓婚涓簁ey
+}
+
+func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *bhomeclient.RegisterInfo, procInfo *bhomeclient.ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
+	conf := bhomeclient.NewConfig(bhomeclient.KEY_REGISTER,512,5,10,10,100, fnLog)
+	handle, err := bhomeclient.Register(ctx, q, conf, reg)
+	if err != nil {
+		return nil, err
+	}
+	mn := &MicroNode {
+		serverId: serverId,
+		handle:   handle,
+		reg:      reg,
+		procInfo: procInfo,
+		fnLog: fnLog,
+		SubChM:   make(map[string]chan *bhomeclient.MsgInfo),
+	}
+	for _,subTopic := range reg.SubTopic {
+		mn.SubChM[subTopic] = make(chan *bhomeclient.MsgInfo, 512)
+	}
+
+	return mn, nil
+}
+
+func (ms *MicroNode) printLog(v ...interface{}) {
+	if ms.fnLog != nil {
+		ms.fnLog(v...)
+	} else {
+		fmt.Println(v...)
+	}
+}
+
+func (ms *MicroNode) UpdateNodeTopics(ts []bhomeclient.NodeInfo) {
+	ms.handle.UpdateNodeTopics(ts)
+}
+
+func (ms *MicroNode) DeRegister() error {
+	if ms.handle != nil {
+		return ms.handle.DeRegister(ms.reg)
+	}
+	return errors.New("ms.handle is nil")
+}
+
+func (ms *MicroNode) startHeartbeat() {
+	hbi := &bhomeclient.HeartBeatInfo{
+		HealthLevel: "health",
+		Fps:         12,
+		WarnInfo:    "warn",
+		ErrorInfo:   "error",
+		Proc:    *ms.procInfo,
+	}
+
+	t := time.NewTicker(time.Second)
+	defer t.Stop()
+
+	for {
+		select {
+		case <-ms.ctx.Done():
+			return
+		case <-t.C:
+			ms.handle.HeartBeat(hbi)
+		}
+	}
+}
+
+func (ms *MicroNode) StartClient() {
+	go ms.startHeartbeat()
+}
+
+func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
+	ms.handlers = funcMap
+
+	go ms.startHeartbeat()
+
+	for {
+		select {
+		case <- ms.ctx.Done():
+			return
+		default:
+			msgS, msgR, keyR := ms.handle.GetMsg()
+			if msgS != nil {
+				//鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+				ms.printLog("Recv Sub Message:", string(msgS.Body))
+				if ch,ok := ms.SubChM[msgS.Topic];ok {
+					ch <- msgS
+				}
+			}
+			if msgR != nil {
+				//鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+				go ms.serve(msgR, keyR)
+			}
+		}
+	}
+}
+
+func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
+	t := time.Now()
+	topicName := request.Header("Servicename")
+
+	if topicName == "" {
+		return nil,errors.New("Servicename 涓嶈兘涓虹┖")
+	}
+	ms.printLog("1:", time.Since(t))
+	t = time.Now()
+	rb, _ := json.Marshal(request)
+	msgR := &bhomeclient.MsgInfo {
+		Topic: request.Path,
+		Body: rb,
+	}
+	ms.printLog("2:", time.Since(t))
+	t = time.Now()
+	mi,err := ms.handle.Request(serverId, msgR, 5000)
+	if mi == nil || err != nil {
+		return nil, err
+	}
+	ms.printLog("3:", time.Since(t))
+	t = time.Now()
+	ri := new(Reply)
+	err = json.Unmarshal(mi.Body, ri)
+	if err != nil {
+		ms.printLog("unmarshal mi.Body err:", err)
+		ri = &Reply{
+			Success: false,
+			Msg: "鏈嶅姟璇锋眰澶辫触",
+			Data: "鏈嶅姟璇锋眰澶辫触",
+		}
+	}
+	ms.printLog("4:", time.Since(t))
+	return ri, nil
+}
+
+func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
+	rb, _ := json.Marshal(request)
+	msgR := &bhomeclient.MsgInfo{
+		Topic: request.Path,
+		Body: rb,
+	}
+
+	mi, err := ms.handle.Request(serverId, msgR, 5000)
+	if err != nil {
+		return nil, err
+	}
+	var ri *Reply
+	err = json.Unmarshal(mi.Body, ri)
+	if err != nil {
+		ri = &Reply{
+			Success: false,
+			Msg: "鏈嶅姟璇锋眰澶辫触",
+			Data: "鏈嶅姟璇锋眰澶辫触",
+		}
+	}
+	return ri, nil
+}
+
+//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key  锛堢粨鏋滃彧鏈変竴涓厓绱狅級
+func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
+	netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
+	if err != nil {
+		return nil
+	}
+	return netNodes
+}
+
+//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛�   锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
+func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
+	netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
+	if err != nil {
+		return nil
+	}
+	return netNodes
+}
+
+func (ms *MicroNode) serve(msgR *bhomeclient.MsgInfo, p int) {
+	var reqBody Request
+	err := json.Unmarshal(msgR.Body, &reqBody)
+	if err != nil {
+		ms.printLog("serve unmarshal msgR.Body err:", err)
+	}
+
+	ms.printLog("reqBody:", reqBody)
+	var ri *Reply
+	if f,ok := ms.handlers[reqBody.Path];ok {
+		ri = f(&reqBody)
+		ms.printLog("call funcMap f,reply:", *ri)
+	} else {
+		ms.printLog("ms.funcMap not eixst path")
+		ri = &Reply{
+			Success: false,
+			Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+			Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+		}
+	}
+	rd,err := json.Marshal(*ri)
+	if err != nil {
+		ms.printLog("marshal *ri err:", err)
+	}
+	rMsg := bhomeclient.MsgInfo{
+		Body: rd,
+	}
+	ms.handle.Reply(p, rMsg)
+}
+
+//鍙戝竷鍒版湰鏈�
+func (ms *MicroNode) Publish(topic string,msg []byte) error {
+	nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
+	return ms.PublishNet(nodes, topic, msg)
+}
+
+func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
+	pi := &bhomeclient.MsgInfo{
+		Topic: topic,
+		Body: msg,
+	}
+	return ms.handle.Pub(nodes, pi)
+}
+
+func (ms *MicroNode) Subscribe(topics []string) chan []byte {
+	ch := make(chan []byte)
+	return ch
+}
+
+//free handle
+func (ms *MicroNode) Free() {
+	if ms.handle != nil {
+		ms.handle.Free()
+	}
+}
\ No newline at end of file
diff --git a/mc/param.go b/mc/param.go
new file mode 100644
index 0000000..e01c1ba
--- /dev/null
+++ b/mc/param.go
@@ -0,0 +1,70 @@
+package mc
+
+import (
+	"encoding/json"
+	"errors"
+)
+
+type Request struct {
+	Path        			string              	`json:"path"`
+	Method      			string              	`json:"method"`
+	ContentType 			string              	`json:"contentType"`
+	HeaderMap   			map[string][]string 	`json:"headerMap"`
+	QueryMap    			map[string][]string 	`json:"queryMap"`
+	FormMap     			map[string][]string 	`json:"formMap"`
+	PostFormMap 			map[string][]string 	`json:"postFormMap"`
+	Body        			[]byte              	`json:"body"`
+	File        			FileArg             	`json:"file"`
+	MultiFiles 				[]FileArg				`json:"multiFiles""`
+}
+
+type FileArg struct {
+	Name 					string 					`json:"name"`
+	Size 					int64 					`json:"size"`
+	Bytes					[]byte 					`json:"bytes"`
+}
+
+type Reply struct {
+	Success 			bool 						`json:"success"`
+	Msg 				string 						`json:"msg"`
+	Data 				interface{} 				`json:"data"`
+}
+
+func (r *Request) Header(key string) string {
+	if values, ok := r.HeaderMap[key]; ok {
+		return values[0]
+	}
+	return ""
+}
+
+func (r *Request) Query(key string) string {
+	if values, ok := r.QueryMap[key]; ok {
+		return values[0]
+	}
+	return ""
+}
+
+func (r *Request) PostForm(key string) string {
+	if values, ok := r.PostFormMap[key]; ok {
+		return values[0]
+	}
+	return ""
+}
+
+func (r *Request) BindJSON(v interface{}) error {
+	return json.Unmarshal(r.Body, &v)
+}
+
+func (r *Request) FormFile() (*FileArg, error) {
+	if r.File.Name != "" && r.File.Size >0 && r.File.Bytes !=nil {
+		return &r.File,nil
+	}
+	return nil, errors.New("file not found")
+}
+
+func (r *Request) FormFiles() (*[]FileArg, error) {
+	if r.MultiFiles != nil && len(r.MultiFiles) >0 {
+		return &r.MultiFiles, nil
+	}
+	return nil,errors.New("multi files not found")
+}
diff --git a/mc/requestTopic.go b/mc/requestTopic.go
new file mode 100644
index 0000000..b3233fd
--- /dev/null
+++ b/mc/requestTopic.go
@@ -0,0 +1,8 @@
+package mc
+
+type MicroFunc func(req *Request) *Reply
+
+
+type Transport interface {
+	RequestTopic(string, Request) (*Reply,error)
+}
\ No newline at end of file

--
Gitblit v1.8.0