From c1b804b45eaa3b5c325a36ab7fcdb6b09b8cdfd4 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 29 十二月 2020 10:15:10 +0800
Subject: [PATCH] 集成MicroNode
---
mc/micronode.go | 249 +++++++++++++++++++++++++++++++++++
mc/broker.go | 13 +
mc/constants.go | 51 +++++++
mc/param.go | 70 ++++++++++
mc/requestTopic.go | 8 +
5 files changed, 391 insertions(+), 0 deletions(-)
diff --git a/mc/broker.go b/mc/broker.go
new file mode 100644
index 0000000..1962e27
--- /dev/null
+++ b/mc/broker.go
@@ -0,0 +1,13 @@
+package mc
+
+import "basic.com/valib/bhomebus.git"
+
+type Broker interface {
+ //鍙戝竷鍒版湰鏈�
+ Publish(topic string, msg []byte) error
+
+ //鍙戝竷鍒拌繙绋嬫満鍣�
+ PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
+
+ Subscribe(topics []string) chan []byte
+}
diff --git a/mc/constants.go b/mc/constants.go
new file mode 100644
index 0000000..aac9a4c
--- /dev/null
+++ b/mc/constants.go
@@ -0,0 +1,51 @@
+package mc
+
+const (
+ Default_Layer = -9999 //鎽勫儚鏈烘坊鍔犺繘鏉ョ殑榛樿妤煎眰
+ TYPE_LOCAL_CAMERA = 0 //鏈湴鎽勫儚鏈�
+ TYPE_GB28181_CAMERA = 1 //鍥芥爣鎽勫儚鏈�
+
+ TYPE_RUNTYPE_VIDEO = -1 //鍗曠函鐨勭洃鎺э紝涓嶅仛鍒嗘瀽
+ TYPE_RUNTYPE_POLL = 0 //杞鍋氫换鍔�
+ TYPE_RUNTYPE_REALTIME = 1 //瀹炴椂鍋氫换鍔�
+
+ Camera_Status_NoRule = 0 //鏈厤瑙勫垯
+ Camera_Status_Wait = 1 //绛夊緟澶勭悊
+ Camera_Status_Doing = 2 //澶勭悊涓�
+ Camera_Status_Other = -1 //鍏朵粬鎯呭喌
+
+ Id_Stack_Pre = "stack_"
+ Stack_Status_NoRule = 0 //鏈厤瑙勫垯
+ Stack_Status_Wait = 1 //绛夊緟澶勭悊
+ Stack_Status_Doing = 2 //澶勭悊涓�
+ Stack_Status_Done = 9 //澶勭悊瀹屾垚
+
+
+ File_Type_Img = 0
+ File_Type_Video = 1
+ File_Type_Audio = 2
+ File_Img_Id_Pre = "img_"
+ File_Video_Id_Pre = "video_"
+ File_Audio_Id_Pre = "audio_"
+ File_Other_Id_Pre = "other_"
+ File_Status_Delete = -1 //鍒犻櫎鐘舵��
+ File_Status_Pause = 0 //鏆傚仠鐘舵��
+ File_Status_Wait = 1 //绛夊緟澶勭悊
+ File_Status_Doing = 2 //澶勭悊涓�
+ File_Status_Complete = 9 //宸插畬鎴�
+ File_Status_DealErr = -2 //澶勭悊澶辫触
+
+
+ Proc_Api_Gateway = "api-gateway"
+ Proc_Camera_Service = "camera-service"
+ Proc_App_Center = "appcenter-service"
+ Proc_Chanmanage_Service = "chanmanage-service"
+ Proc_CompTable_Service = "compTable-service"
+ Proc_Gb28181_Service = "gb28181-service"
+ Proc_Push_Service = "push-service" //
+ Proc_Realtime_Service = "realtime-service" //瀹炴椂鐩戞帶
+ Proc_Scene_Service = "scene-service" //鍦烘櫙閰嶇疆杩涚▼
+ Proc_Search_Service = "search-service" //鏌ヨ妫�绱㈣繘绋�
+ Proc_Stack_Service = "stack-service" //鏁版嵁鏍堣繘绋�
+ Proc_System_Service = "system-service" //绯荤粺鏈嶅姟
+)
diff --git a/mc/micronode.go b/mc/micronode.go
new file mode 100644
index 0000000..95d28cf
--- /dev/null
+++ b/mc/micronode.go
@@ -0,0 +1,249 @@
+package mc
+
+import (
+ "basic.com/valib/bhomebus.git"
+ "bhomeclient"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "os"
+ "time"
+)
+
+type MicroNode struct {
+ ctx context.Context
+ handle *bhomeclient.BHBus
+ reg *bhomeclient.RegisterInfo
+ procInfo *bhomeclient.ProcInfo
+ handlers map[string]MicroFunc
+ serverId string
+ fnLog func(...interface{})
+
+ SubChM map[string]chan *bhomeclient.MsgInfo //浠ヨ闃呯殑涓婚涓簁ey
+}
+
+func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *bhomeclient.RegisterInfo, procInfo *bhomeclient.ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
+ conf := bhomeclient.NewConfig(bhomeclient.KEY_REGISTER,512,5,10,10,100, fnLog)
+ handle, err := bhomeclient.Register(ctx, q, conf, reg)
+ if err != nil {
+ return nil, err
+ }
+ mn := &MicroNode {
+ serverId: serverId,
+ handle: handle,
+ reg: reg,
+ procInfo: procInfo,
+ fnLog: fnLog,
+ SubChM: make(map[string]chan *bhomeclient.MsgInfo),
+ }
+ for _,subTopic := range reg.SubTopic {
+ mn.SubChM[subTopic] = make(chan *bhomeclient.MsgInfo, 512)
+ }
+
+ return mn, nil
+}
+
+func (ms *MicroNode) printLog(v ...interface{}) {
+ if ms.fnLog != nil {
+ ms.fnLog(v...)
+ } else {
+ fmt.Println(v...)
+ }
+}
+
+func (ms *MicroNode) UpdateNodeTopics(ts []bhomeclient.NodeInfo) {
+ ms.handle.UpdateNodeTopics(ts)
+}
+
+func (ms *MicroNode) DeRegister() error {
+ if ms.handle != nil {
+ return ms.handle.DeRegister(ms.reg)
+ }
+ return errors.New("ms.handle is nil")
+}
+
+func (ms *MicroNode) startHeartbeat() {
+ hbi := &bhomeclient.HeartBeatInfo{
+ HealthLevel: "health",
+ Fps: 12,
+ WarnInfo: "warn",
+ ErrorInfo: "error",
+ Proc: *ms.procInfo,
+ }
+
+ t := time.NewTicker(time.Second)
+ defer t.Stop()
+
+ for {
+ select {
+ case <-ms.ctx.Done():
+ return
+ case <-t.C:
+ ms.handle.HeartBeat(hbi)
+ }
+ }
+}
+
+func (ms *MicroNode) StartClient() {
+ go ms.startHeartbeat()
+}
+
+func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
+ ms.handlers = funcMap
+
+ go ms.startHeartbeat()
+
+ for {
+ select {
+ case <- ms.ctx.Done():
+ return
+ default:
+ msgS, msgR, keyR := ms.handle.GetMsg()
+ if msgS != nil {
+ //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭�
+ ms.printLog("Recv Sub Message:", string(msgS.Body))
+ if ch,ok := ms.SubChM[msgS.Topic];ok {
+ ch <- msgS
+ }
+ }
+ if msgR != nil {
+ //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭�
+ go ms.serve(msgR, keyR)
+ }
+ }
+ }
+}
+
+func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
+ t := time.Now()
+ topicName := request.Header("Servicename")
+
+ if topicName == "" {
+ return nil,errors.New("Servicename 涓嶈兘涓虹┖")
+ }
+ ms.printLog("1:", time.Since(t))
+ t = time.Now()
+ rb, _ := json.Marshal(request)
+ msgR := &bhomeclient.MsgInfo {
+ Topic: request.Path,
+ Body: rb,
+ }
+ ms.printLog("2:", time.Since(t))
+ t = time.Now()
+ mi,err := ms.handle.Request(serverId, msgR, 5000)
+ if mi == nil || err != nil {
+ return nil, err
+ }
+ ms.printLog("3:", time.Since(t))
+ t = time.Now()
+ ri := new(Reply)
+ err = json.Unmarshal(mi.Body, ri)
+ if err != nil {
+ ms.printLog("unmarshal mi.Body err:", err)
+ ri = &Reply{
+ Success: false,
+ Msg: "鏈嶅姟璇锋眰澶辫触",
+ Data: "鏈嶅姟璇锋眰澶辫触",
+ }
+ }
+ ms.printLog("4:", time.Since(t))
+ return ri, nil
+}
+
+func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
+ rb, _ := json.Marshal(request)
+ msgR := &bhomeclient.MsgInfo{
+ Topic: request.Path,
+ Body: rb,
+ }
+
+ mi, err := ms.handle.Request(serverId, msgR, 5000)
+ if err != nil {
+ return nil, err
+ }
+ var ri *Reply
+ err = json.Unmarshal(mi.Body, ri)
+ if err != nil {
+ ri = &Reply{
+ Success: false,
+ Msg: "鏈嶅姟璇锋眰澶辫触",
+ Data: "鏈嶅姟璇锋眰澶辫触",
+ }
+ }
+ return ri, nil
+}
+
+//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級
+func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
+ if err != nil {
+ return nil
+ }
+ return netNodes
+}
+
+//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛�
+func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
+ netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
+ if err != nil {
+ return nil
+ }
+ return netNodes
+}
+
+func (ms *MicroNode) serve(msgR *bhomeclient.MsgInfo, p int) {
+ var reqBody Request
+ err := json.Unmarshal(msgR.Body, &reqBody)
+ if err != nil {
+ ms.printLog("serve unmarshal msgR.Body err:", err)
+ }
+
+ ms.printLog("reqBody:", reqBody)
+ var ri *Reply
+ if f,ok := ms.handlers[reqBody.Path];ok {
+ ri = f(&reqBody)
+ ms.printLog("call funcMap f,reply:", *ri)
+ } else {
+ ms.printLog("ms.funcMap not eixst path")
+ ri = &Reply{
+ Success: false,
+ Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl",
+ }
+ }
+ rd,err := json.Marshal(*ri)
+ if err != nil {
+ ms.printLog("marshal *ri err:", err)
+ }
+ rMsg := bhomeclient.MsgInfo{
+ Body: rd,
+ }
+ ms.handle.Reply(p, rMsg)
+}
+
+//鍙戝竷鍒版湰鏈�
+func (ms *MicroNode) Publish(topic string,msg []byte) error {
+ nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
+ return ms.PublishNet(nodes, topic, msg)
+}
+
+func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
+ pi := &bhomeclient.MsgInfo{
+ Topic: topic,
+ Body: msg,
+ }
+ return ms.handle.Pub(nodes, pi)
+}
+
+func (ms *MicroNode) Subscribe(topics []string) chan []byte {
+ ch := make(chan []byte)
+ return ch
+}
+
+//free handle
+func (ms *MicroNode) Free() {
+ if ms.handle != nil {
+ ms.handle.Free()
+ }
+}
\ No newline at end of file
diff --git a/mc/param.go b/mc/param.go
new file mode 100644
index 0000000..e01c1ba
--- /dev/null
+++ b/mc/param.go
@@ -0,0 +1,70 @@
+package mc
+
+import (
+ "encoding/json"
+ "errors"
+)
+
+type Request struct {
+ Path string `json:"path"`
+ Method string `json:"method"`
+ ContentType string `json:"contentType"`
+ HeaderMap map[string][]string `json:"headerMap"`
+ QueryMap map[string][]string `json:"queryMap"`
+ FormMap map[string][]string `json:"formMap"`
+ PostFormMap map[string][]string `json:"postFormMap"`
+ Body []byte `json:"body"`
+ File FileArg `json:"file"`
+ MultiFiles []FileArg `json:"multiFiles""`
+}
+
+type FileArg struct {
+ Name string `json:"name"`
+ Size int64 `json:"size"`
+ Bytes []byte `json:"bytes"`
+}
+
+type Reply struct {
+ Success bool `json:"success"`
+ Msg string `json:"msg"`
+ Data interface{} `json:"data"`
+}
+
+func (r *Request) Header(key string) string {
+ if values, ok := r.HeaderMap[key]; ok {
+ return values[0]
+ }
+ return ""
+}
+
+func (r *Request) Query(key string) string {
+ if values, ok := r.QueryMap[key]; ok {
+ return values[0]
+ }
+ return ""
+}
+
+func (r *Request) PostForm(key string) string {
+ if values, ok := r.PostFormMap[key]; ok {
+ return values[0]
+ }
+ return ""
+}
+
+func (r *Request) BindJSON(v interface{}) error {
+ return json.Unmarshal(r.Body, &v)
+}
+
+func (r *Request) FormFile() (*FileArg, error) {
+ if r.File.Name != "" && r.File.Size >0 && r.File.Bytes !=nil {
+ return &r.File,nil
+ }
+ return nil, errors.New("file not found")
+}
+
+func (r *Request) FormFiles() (*[]FileArg, error) {
+ if r.MultiFiles != nil && len(r.MultiFiles) >0 {
+ return &r.MultiFiles, nil
+ }
+ return nil,errors.New("multi files not found")
+}
diff --git a/mc/requestTopic.go b/mc/requestTopic.go
new file mode 100644
index 0000000..b3233fd
--- /dev/null
+++ b/mc/requestTopic.go
@@ -0,0 +1,8 @@
+package mc
+
+type MicroFunc func(req *Request) *Reply
+
+
+type Transport interface {
+ RequestTopic(string, Request) (*Reply,error)
+}
\ No newline at end of file
--
Gitblit v1.8.0