liuxiaolong
2020-12-29 c1b804b45eaa3b5c325a36ab7fcdb6b09b8cdfd4
集成MicroNode
5个文件已添加
391 ■■■■■ 已修改文件
mc/broker.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mc/constants.go 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mc/micronode.go 249 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mc/param.go 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mc/requestTopic.go 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mc/broker.go
New file
@@ -0,0 +1,13 @@
package mc
import "basic.com/valib/bhomebus.git"
type Broker interface {
    //发布到本机
    Publish(topic string, msg []byte) error
    //发布到远程机器
    PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
    Subscribe(topics []string) chan []byte
}
mc/constants.go
New file
@@ -0,0 +1,51 @@
package mc
const (
    Default_Layer = -9999 //摄像机添加进来的默认楼层
    TYPE_LOCAL_CAMERA = 0 //本地摄像机
    TYPE_GB28181_CAMERA = 1 //国标摄像机
    TYPE_RUNTYPE_VIDEO = -1 //单纯的监控,不做分析
    TYPE_RUNTYPE_POLL = 0 //轮询做任务
    TYPE_RUNTYPE_REALTIME = 1 //实时做任务
    Camera_Status_NoRule = 0  //未配规则
    Camera_Status_Wait = 1  //等待处理
    Camera_Status_Doing = 2  //处理中
    Camera_Status_Other = -1 //其他情况
    Id_Stack_Pre = "stack_"
    Stack_Status_NoRule = 0  //未配规则
    Stack_Status_Wait = 1  //等待处理
    Stack_Status_Doing = 2  //处理中
    Stack_Status_Done = 9  //处理完成
    File_Type_Img = 0
    File_Type_Video = 1
    File_Type_Audio = 2
    File_Img_Id_Pre = "img_"
    File_Video_Id_Pre = "video_"
    File_Audio_Id_Pre = "audio_"
    File_Other_Id_Pre = "other_"
    File_Status_Delete = -1              //删除状态
    File_Status_Pause = 0               //暂停状态
    File_Status_Wait = 1                   //等待处理
    File_Status_Doing = 2                //处理中
    File_Status_Complete = 9            //已完成
    File_Status_DealErr = -2            //处理失败
    Proc_Api_Gateway = "api-gateway"
    Proc_Camera_Service = "camera-service"
    Proc_App_Center = "appcenter-service"
    Proc_Chanmanage_Service = "chanmanage-service"
    Proc_CompTable_Service = "compTable-service"
    Proc_Gb28181_Service = "gb28181-service"
    Proc_Push_Service = "push-service"   //
    Proc_Realtime_Service = "realtime-service" //实时监控
    Proc_Scene_Service = "scene-service"   //场景配置进程
    Proc_Search_Service = "search-service" //查询检索进程
    Proc_Stack_Service = "stack-service"   //数据栈进程
    Proc_System_Service = "system-service" //系统服务
)
mc/micronode.go
New file
@@ -0,0 +1,249 @@
package mc
import (
    "basic.com/valib/bhomebus.git"
    "bhomeclient"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "os"
    "time"
)
type MicroNode struct {
    ctx         context.Context
    handle         *bhomeclient.BHBus
    reg         *bhomeclient.RegisterInfo
    procInfo     *bhomeclient.ProcInfo
    handlers     map[string]MicroFunc
    serverId     string
    fnLog         func(...interface{})
    SubChM         map[string]chan *bhomeclient.MsgInfo //以订阅的主题为key
}
func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *bhomeclient.RegisterInfo, procInfo *bhomeclient.ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){
    conf := bhomeclient.NewConfig(bhomeclient.KEY_REGISTER,512,5,10,10,100, fnLog)
    handle, err := bhomeclient.Register(ctx, q, conf, reg)
    if err != nil {
        return nil, err
    }
    mn := &MicroNode {
        serverId: serverId,
        handle:   handle,
        reg:      reg,
        procInfo: procInfo,
        fnLog: fnLog,
        SubChM:   make(map[string]chan *bhomeclient.MsgInfo),
    }
    for _,subTopic := range reg.SubTopic {
        mn.SubChM[subTopic] = make(chan *bhomeclient.MsgInfo, 512)
    }
    return mn, nil
}
func (ms *MicroNode) printLog(v ...interface{}) {
    if ms.fnLog != nil {
        ms.fnLog(v...)
    } else {
        fmt.Println(v...)
    }
}
func (ms *MicroNode) UpdateNodeTopics(ts []bhomeclient.NodeInfo) {
    ms.handle.UpdateNodeTopics(ts)
}
func (ms *MicroNode) DeRegister() error {
    if ms.handle != nil {
        return ms.handle.DeRegister(ms.reg)
    }
    return errors.New("ms.handle is nil")
}
func (ms *MicroNode) startHeartbeat() {
    hbi := &bhomeclient.HeartBeatInfo{
        HealthLevel: "health",
        Fps:         12,
        WarnInfo:    "warn",
        ErrorInfo:   "error",
        Proc:    *ms.procInfo,
    }
    t := time.NewTicker(time.Second)
    defer t.Stop()
    for {
        select {
        case <-ms.ctx.Done():
            return
        case <-t.C:
            ms.handle.HeartBeat(hbi)
        }
    }
}
func (ms *MicroNode) StartClient() {
    go ms.startHeartbeat()
}
func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) {
    ms.handlers = funcMap
    go ms.startHeartbeat()
    for {
        select {
        case <- ms.ctx.Done():
            return
        default:
            msgS, msgR, keyR := ms.handle.GetMsg()
            if msgS != nil {
                //收到其它进程的发布消息
                ms.printLog("Recv Sub Message:", string(msgS.Body))
                if ch,ok := ms.SubChM[msgS.Topic];ok {
                    ch <- msgS
                }
            }
            if msgR != nil {
                //收到其它进程的请求消息
                go ms.serve(msgR, keyR)
            }
        }
    }
}
func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) {
    t := time.Now()
    topicName := request.Header("Servicename")
    if topicName == "" {
        return nil,errors.New("Servicename 不能为空")
    }
    ms.printLog("1:", time.Since(t))
    t = time.Now()
    rb, _ := json.Marshal(request)
    msgR := &bhomeclient.MsgInfo {
        Topic: request.Path,
        Body: rb,
    }
    ms.printLog("2:", time.Since(t))
    t = time.Now()
    mi,err := ms.handle.Request(serverId, msgR, 5000)
    if mi == nil || err != nil {
        return nil, err
    }
    ms.printLog("3:", time.Since(t))
    t = time.Now()
    ri := new(Reply)
    err = json.Unmarshal(mi.Body, ri)
    if err != nil {
        ms.printLog("unmarshal mi.Body err:", err)
        ri = &Reply{
            Success: false,
            Msg: "服务请求失败",
            Data: "服务请求失败",
        }
    }
    ms.printLog("4:", time.Since(t))
    return ri, nil
}
func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) {
    rb, _ := json.Marshal(request)
    msgR := &bhomeclient.MsgInfo{
        Topic: request.Path,
        Body: rb,
    }
    mi, err := ms.handle.Request(serverId, msgR, 5000)
    if err != nil {
        return nil, err
    }
    var ri *Reply
    err = json.Unmarshal(mi.Body, ri)
    if err != nil {
        ri = &Reply{
            Success: false,
            Msg: "服务请求失败",
            Data: "服务请求失败",
        }
    }
    return ri, nil
}
//获取本机中某一个主题的 key  (结果只有一个元素)
func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode {
    netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName)
    if err != nil {
        return nil
    }
    return netNodes
}
//获取集群中所有节点某个主题的key信息,   (结果可能有多个)
func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode {
    netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName)
    if err != nil {
        return nil
    }
    return netNodes
}
func (ms *MicroNode) serve(msgR *bhomeclient.MsgInfo, p int) {
    var reqBody Request
    err := json.Unmarshal(msgR.Body, &reqBody)
    if err != nil {
        ms.printLog("serve unmarshal msgR.Body err:", err)
    }
    ms.printLog("reqBody:", reqBody)
    var ri *Reply
    if f,ok := ms.handlers[reqBody.Path];ok {
        ri = f(&reqBody)
        ms.printLog("call funcMap f,reply:", *ri)
    } else {
        ms.printLog("ms.funcMap not eixst path")
        ri = &Reply{
            Success: false,
            Msg: "请求的接口不存在,请检查url",
            Data: "请求的接口不存在,请检查url",
        }
    }
    rd,err := json.Marshal(*ri)
    if err != nil {
        ms.printLog("marshal *ri err:", err)
    }
    rMsg := bhomeclient.MsgInfo{
        Body: rd,
    }
    ms.handle.Reply(p, rMsg)
}
//发布到本机
func (ms *MicroNode) Publish(topic string,msg []byte) error {
    nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{})
    return ms.PublishNet(nodes, topic, msg)
}
func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
    pi := &bhomeclient.MsgInfo{
        Topic: topic,
        Body: msg,
    }
    return ms.handle.Pub(nodes, pi)
}
func (ms *MicroNode) Subscribe(topics []string) chan []byte {
    ch := make(chan []byte)
    return ch
}
//free handle
func (ms *MicroNode) Free() {
    if ms.handle != nil {
        ms.handle.Free()
    }
}
mc/param.go
New file
@@ -0,0 +1,70 @@
package mc
import (
    "encoding/json"
    "errors"
)
type Request struct {
    Path                    string                  `json:"path"`
    Method                  string                  `json:"method"`
    ContentType             string                  `json:"contentType"`
    HeaderMap               map[string][]string     `json:"headerMap"`
    QueryMap                map[string][]string     `json:"queryMap"`
    FormMap                 map[string][]string     `json:"formMap"`
    PostFormMap             map[string][]string     `json:"postFormMap"`
    Body                    []byte                  `json:"body"`
    File                    FileArg                 `json:"file"`
    MultiFiles                 []FileArg                `json:"multiFiles""`
}
type FileArg struct {
    Name                     string                     `json:"name"`
    Size                     int64                     `json:"size"`
    Bytes                    []byte                     `json:"bytes"`
}
type Reply struct {
    Success             bool                         `json:"success"`
    Msg                 string                         `json:"msg"`
    Data                 interface{}                 `json:"data"`
}
func (r *Request) Header(key string) string {
    if values, ok := r.HeaderMap[key]; ok {
        return values[0]
    }
    return ""
}
func (r *Request) Query(key string) string {
    if values, ok := r.QueryMap[key]; ok {
        return values[0]
    }
    return ""
}
func (r *Request) PostForm(key string) string {
    if values, ok := r.PostFormMap[key]; ok {
        return values[0]
    }
    return ""
}
func (r *Request) BindJSON(v interface{}) error {
    return json.Unmarshal(r.Body, &v)
}
func (r *Request) FormFile() (*FileArg, error) {
    if r.File.Name != "" && r.File.Size >0 && r.File.Bytes !=nil {
        return &r.File,nil
    }
    return nil, errors.New("file not found")
}
func (r *Request) FormFiles() (*[]FileArg, error) {
    if r.MultiFiles != nil && len(r.MultiFiles) >0 {
        return &r.MultiFiles, nil
    }
    return nil,errors.New("multi files not found")
}
mc/requestTopic.go
New file
@@ -0,0 +1,8 @@
package mc
type MicroFunc func(req *Request) *Reply
type Transport interface {
    RequestTopic(string, Request) (*Reply,error)
}