From c1b804b45eaa3b5c325a36ab7fcdb6b09b8cdfd4 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 29 十二月 2020 10:15:10 +0800 Subject: [PATCH] 集成MicroNode --- mc/micronode.go | 249 +++++++++++++++++++++++++++++++++++ mc/broker.go | 13 + mc/constants.go | 51 +++++++ mc/param.go | 70 ++++++++++ mc/requestTopic.go | 8 + 5 files changed, 391 insertions(+), 0 deletions(-) diff --git a/mc/broker.go b/mc/broker.go new file mode 100644 index 0000000..1962e27 --- /dev/null +++ b/mc/broker.go @@ -0,0 +1,13 @@ +package mc + +import "basic.com/valib/bhomebus.git" + +type Broker interface { + //鍙戝竷鍒版湰鏈� + Publish(topic string, msg []byte) error + + //鍙戝竷鍒拌繙绋嬫満鍣� + PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error + + Subscribe(topics []string) chan []byte +} diff --git a/mc/constants.go b/mc/constants.go new file mode 100644 index 0000000..aac9a4c --- /dev/null +++ b/mc/constants.go @@ -0,0 +1,51 @@ +package mc + +const ( + Default_Layer = -9999 //鎽勫儚鏈烘坊鍔犺繘鏉ョ殑榛樿妤煎眰 + TYPE_LOCAL_CAMERA = 0 //鏈湴鎽勫儚鏈� + TYPE_GB28181_CAMERA = 1 //鍥芥爣鎽勫儚鏈� + + TYPE_RUNTYPE_VIDEO = -1 //鍗曠函鐨勭洃鎺э紝涓嶅仛鍒嗘瀽 + TYPE_RUNTYPE_POLL = 0 //杞鍋氫换鍔� + TYPE_RUNTYPE_REALTIME = 1 //瀹炴椂鍋氫换鍔� + + Camera_Status_NoRule = 0 //鏈厤瑙勫垯 + Camera_Status_Wait = 1 //绛夊緟澶勭悊 + Camera_Status_Doing = 2 //澶勭悊涓� + Camera_Status_Other = -1 //鍏朵粬鎯呭喌 + + Id_Stack_Pre = "stack_" + Stack_Status_NoRule = 0 //鏈厤瑙勫垯 + Stack_Status_Wait = 1 //绛夊緟澶勭悊 + Stack_Status_Doing = 2 //澶勭悊涓� + Stack_Status_Done = 9 //澶勭悊瀹屾垚 + + + File_Type_Img = 0 + File_Type_Video = 1 + File_Type_Audio = 2 + File_Img_Id_Pre = "img_" + File_Video_Id_Pre = "video_" + File_Audio_Id_Pre = "audio_" + File_Other_Id_Pre = "other_" + File_Status_Delete = -1 //鍒犻櫎鐘舵�� + File_Status_Pause = 0 //鏆傚仠鐘舵�� + File_Status_Wait = 1 //绛夊緟澶勭悊 + File_Status_Doing = 2 //澶勭悊涓� + File_Status_Complete = 9 //宸插畬鎴� + File_Status_DealErr = -2 //澶勭悊澶辫触 + + + Proc_Api_Gateway = "api-gateway" + Proc_Camera_Service = "camera-service" + Proc_App_Center = "appcenter-service" + Proc_Chanmanage_Service = "chanmanage-service" + Proc_CompTable_Service = "compTable-service" + Proc_Gb28181_Service = "gb28181-service" + Proc_Push_Service = "push-service" // + Proc_Realtime_Service = "realtime-service" //瀹炴椂鐩戞帶 + Proc_Scene_Service = "scene-service" //鍦烘櫙閰嶇疆杩涚▼ + Proc_Search_Service = "search-service" //鏌ヨ妫�绱㈣繘绋� + Proc_Stack_Service = "stack-service" //鏁版嵁鏍堣繘绋� + Proc_System_Service = "system-service" //绯荤粺鏈嶅姟 +) diff --git a/mc/micronode.go b/mc/micronode.go new file mode 100644 index 0000000..95d28cf --- /dev/null +++ b/mc/micronode.go @@ -0,0 +1,249 @@ +package mc + +import ( + "basic.com/valib/bhomebus.git" + "bhomeclient" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "time" +) + +type MicroNode struct { + ctx context.Context + handle *bhomeclient.BHBus + reg *bhomeclient.RegisterInfo + procInfo *bhomeclient.ProcInfo + handlers map[string]MicroFunc + serverId string + fnLog func(...interface{}) + + SubChM map[string]chan *bhomeclient.MsgInfo //浠ヨ闃呯殑涓婚涓簁ey +} + +func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *bhomeclient.RegisterInfo, procInfo *bhomeclient.ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ + conf := bhomeclient.NewConfig(bhomeclient.KEY_REGISTER,512,5,10,10,100, fnLog) + handle, err := bhomeclient.Register(ctx, q, conf, reg) + if err != nil { + return nil, err + } + mn := &MicroNode { + serverId: serverId, + handle: handle, + reg: reg, + procInfo: procInfo, + fnLog: fnLog, + SubChM: make(map[string]chan *bhomeclient.MsgInfo), + } + for _,subTopic := range reg.SubTopic { + mn.SubChM[subTopic] = make(chan *bhomeclient.MsgInfo, 512) + } + + return mn, nil +} + +func (ms *MicroNode) printLog(v ...interface{}) { + if ms.fnLog != nil { + ms.fnLog(v...) + } else { + fmt.Println(v...) + } +} + +func (ms *MicroNode) UpdateNodeTopics(ts []bhomeclient.NodeInfo) { + ms.handle.UpdateNodeTopics(ts) +} + +func (ms *MicroNode) DeRegister() error { + if ms.handle != nil { + return ms.handle.DeRegister(ms.reg) + } + return errors.New("ms.handle is nil") +} + +func (ms *MicroNode) startHeartbeat() { + hbi := &bhomeclient.HeartBeatInfo{ + HealthLevel: "health", + Fps: 12, + WarnInfo: "warn", + ErrorInfo: "error", + Proc: *ms.procInfo, + } + + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case <-ms.ctx.Done(): + return + case <-t.C: + ms.handle.HeartBeat(hbi) + } + } +} + +func (ms *MicroNode) StartClient() { + go ms.startHeartbeat() +} + +func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) { + ms.handlers = funcMap + + go ms.startHeartbeat() + + for { + select { + case <- ms.ctx.Done(): + return + default: + msgS, msgR, keyR := ms.handle.GetMsg() + if msgS != nil { + //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� + ms.printLog("Recv Sub Message:", string(msgS.Body)) + if ch,ok := ms.SubChM[msgS.Topic];ok { + ch <- msgS + } + } + if msgR != nil { + //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� + go ms.serve(msgR, keyR) + } + } + } +} + +func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) { + t := time.Now() + topicName := request.Header("Servicename") + + if topicName == "" { + return nil,errors.New("Servicename 涓嶈兘涓虹┖") + } + ms.printLog("1:", time.Since(t)) + t = time.Now() + rb, _ := json.Marshal(request) + msgR := &bhomeclient.MsgInfo { + Topic: request.Path, + Body: rb, + } + ms.printLog("2:", time.Since(t)) + t = time.Now() + mi,err := ms.handle.Request(serverId, msgR, 5000) + if mi == nil || err != nil { + return nil, err + } + ms.printLog("3:", time.Since(t)) + t = time.Now() + ri := new(Reply) + err = json.Unmarshal(mi.Body, ri) + if err != nil { + ms.printLog("unmarshal mi.Body err:", err) + ri = &Reply{ + Success: false, + Msg: "鏈嶅姟璇锋眰澶辫触", + Data: "鏈嶅姟璇锋眰澶辫触", + } + } + ms.printLog("4:", time.Since(t)) + return ri, nil +} + +func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) { + rb, _ := json.Marshal(request) + msgR := &bhomeclient.MsgInfo{ + Topic: request.Path, + Body: rb, + } + + mi, err := ms.handle.Request(serverId, msgR, 5000) + if err != nil { + return nil, err + } + var ri *Reply + err = json.Unmarshal(mi.Body, ri) + if err != nil { + ri = &Reply{ + Success: false, + Msg: "鏈嶅姟璇锋眰澶辫触", + Data: "鏈嶅姟璇锋眰澶辫触", + } + } + return ri, nil +} + +//鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級 +func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { + netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) + if err != nil { + return nil + } + return netNodes +} + +//鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛� +func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { + netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) + if err != nil { + return nil + } + return netNodes +} + +func (ms *MicroNode) serve(msgR *bhomeclient.MsgInfo, p int) { + var reqBody Request + err := json.Unmarshal(msgR.Body, &reqBody) + if err != nil { + ms.printLog("serve unmarshal msgR.Body err:", err) + } + + ms.printLog("reqBody:", reqBody) + var ri *Reply + if f,ok := ms.handlers[reqBody.Path];ok { + ri = f(&reqBody) + ms.printLog("call funcMap f,reply:", *ri) + } else { + ms.printLog("ms.funcMap not eixst path") + ri = &Reply{ + Success: false, + Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", + Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", + } + } + rd,err := json.Marshal(*ri) + if err != nil { + ms.printLog("marshal *ri err:", err) + } + rMsg := bhomeclient.MsgInfo{ + Body: rd, + } + ms.handle.Reply(p, rMsg) +} + +//鍙戝竷鍒版湰鏈� +func (ms *MicroNode) Publish(topic string,msg []byte) error { + nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{}) + return ms.PublishNet(nodes, topic, msg) +} + +func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error { + pi := &bhomeclient.MsgInfo{ + Topic: topic, + Body: msg, + } + return ms.handle.Pub(nodes, pi) +} + +func (ms *MicroNode) Subscribe(topics []string) chan []byte { + ch := make(chan []byte) + return ch +} + +//free handle +func (ms *MicroNode) Free() { + if ms.handle != nil { + ms.handle.Free() + } +} \ No newline at end of file diff --git a/mc/param.go b/mc/param.go new file mode 100644 index 0000000..e01c1ba --- /dev/null +++ b/mc/param.go @@ -0,0 +1,70 @@ +package mc + +import ( + "encoding/json" + "errors" +) + +type Request struct { + Path string `json:"path"` + Method string `json:"method"` + ContentType string `json:"contentType"` + HeaderMap map[string][]string `json:"headerMap"` + QueryMap map[string][]string `json:"queryMap"` + FormMap map[string][]string `json:"formMap"` + PostFormMap map[string][]string `json:"postFormMap"` + Body []byte `json:"body"` + File FileArg `json:"file"` + MultiFiles []FileArg `json:"multiFiles""` +} + +type FileArg struct { + Name string `json:"name"` + Size int64 `json:"size"` + Bytes []byte `json:"bytes"` +} + +type Reply struct { + Success bool `json:"success"` + Msg string `json:"msg"` + Data interface{} `json:"data"` +} + +func (r *Request) Header(key string) string { + if values, ok := r.HeaderMap[key]; ok { + return values[0] + } + return "" +} + +func (r *Request) Query(key string) string { + if values, ok := r.QueryMap[key]; ok { + return values[0] + } + return "" +} + +func (r *Request) PostForm(key string) string { + if values, ok := r.PostFormMap[key]; ok { + return values[0] + } + return "" +} + +func (r *Request) BindJSON(v interface{}) error { + return json.Unmarshal(r.Body, &v) +} + +func (r *Request) FormFile() (*FileArg, error) { + if r.File.Name != "" && r.File.Size >0 && r.File.Bytes !=nil { + return &r.File,nil + } + return nil, errors.New("file not found") +} + +func (r *Request) FormFiles() (*[]FileArg, error) { + if r.MultiFiles != nil && len(r.MultiFiles) >0 { + return &r.MultiFiles, nil + } + return nil,errors.New("multi files not found") +} diff --git a/mc/requestTopic.go b/mc/requestTopic.go new file mode 100644 index 0000000..b3233fd --- /dev/null +++ b/mc/requestTopic.go @@ -0,0 +1,8 @@ +package mc + +type MicroFunc func(req *Request) *Reply + + +type Transport interface { + RequestTopic(string, Request) (*Reply,error) +} \ No newline at end of file -- Gitblit v1.8.0