New file |
| | |
| | | package mc |
| | | |
| | | import "basic.com/valib/bhomebus.git" |
| | | |
| | | type Broker interface { |
| | | //发布到本机 |
| | | Publish(topic string, msg []byte) error |
| | | |
| | | //发布到远程机器 |
| | | PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error |
| | | |
| | | Subscribe(topics []string) chan []byte |
| | | } |
New file |
| | |
| | | package mc |
| | | |
| | | const ( |
| | | Default_Layer = -9999 //摄像机添加进来的默认楼层 |
| | | TYPE_LOCAL_CAMERA = 0 //本地摄像机 |
| | | TYPE_GB28181_CAMERA = 1 //国标摄像机 |
| | | |
| | | TYPE_RUNTYPE_VIDEO = -1 //单纯的监控,不做分析 |
| | | TYPE_RUNTYPE_POLL = 0 //轮询做任务 |
| | | TYPE_RUNTYPE_REALTIME = 1 //实时做任务 |
| | | |
| | | Camera_Status_NoRule = 0 //未配规则 |
| | | Camera_Status_Wait = 1 //等待处理 |
| | | Camera_Status_Doing = 2 //处理中 |
| | | Camera_Status_Other = -1 //其他情况 |
| | | |
| | | Id_Stack_Pre = "stack_" |
| | | Stack_Status_NoRule = 0 //未配规则 |
| | | Stack_Status_Wait = 1 //等待处理 |
| | | Stack_Status_Doing = 2 //处理中 |
| | | Stack_Status_Done = 9 //处理完成 |
| | | |
| | | |
| | | File_Type_Img = 0 |
| | | File_Type_Video = 1 |
| | | File_Type_Audio = 2 |
| | | File_Img_Id_Pre = "img_" |
| | | File_Video_Id_Pre = "video_" |
| | | File_Audio_Id_Pre = "audio_" |
| | | File_Other_Id_Pre = "other_" |
| | | File_Status_Delete = -1 //删除状态 |
| | | File_Status_Pause = 0 //暂停状态 |
| | | File_Status_Wait = 1 //等待处理 |
| | | File_Status_Doing = 2 //处理中 |
| | | File_Status_Complete = 9 //已完成 |
| | | File_Status_DealErr = -2 //处理失败 |
| | | |
| | | |
| | | Proc_Api_Gateway = "api-gateway" |
| | | Proc_Camera_Service = "camera-service" |
| | | Proc_App_Center = "appcenter-service" |
| | | Proc_Chanmanage_Service = "chanmanage-service" |
| | | Proc_CompTable_Service = "compTable-service" |
| | | Proc_Gb28181_Service = "gb28181-service" |
| | | Proc_Push_Service = "push-service" // |
| | | Proc_Realtime_Service = "realtime-service" //实时监控 |
| | | Proc_Scene_Service = "scene-service" //场景配置进程 |
| | | Proc_Search_Service = "search-service" //查询检索进程 |
| | | Proc_Stack_Service = "stack-service" //数据栈进程 |
| | | Proc_System_Service = "system-service" //系统服务 |
| | | ) |
New file |
| | |
| | | package mc |
| | | |
| | | import ( |
| | | "basic.com/valib/bhomebus.git" |
| | | "bhomeclient" |
| | | "context" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | "time" |
| | | ) |
| | | |
| | | type MicroNode struct { |
| | | ctx context.Context |
| | | handle *bhomeclient.BHBus |
| | | reg *bhomeclient.RegisterInfo |
| | | procInfo *bhomeclient.ProcInfo |
| | | handlers map[string]MicroFunc |
| | | serverId string |
| | | fnLog func(...interface{}) |
| | | |
| | | SubChM map[string]chan *bhomeclient.MsgInfo //以订阅的主题为key |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *bhomeclient.RegisterInfo, procInfo *bhomeclient.ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | conf := bhomeclient.NewConfig(bhomeclient.KEY_REGISTER,512,5,10,10,100, fnLog) |
| | | handle, err := bhomeclient.Register(ctx, q, conf, reg) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | mn := &MicroNode { |
| | | serverId: serverId, |
| | | handle: handle, |
| | | reg: reg, |
| | | procInfo: procInfo, |
| | | fnLog: fnLog, |
| | | SubChM: make(map[string]chan *bhomeclient.MsgInfo), |
| | | } |
| | | for _,subTopic := range reg.SubTopic { |
| | | mn.SubChM[subTopic] = make(chan *bhomeclient.MsgInfo, 512) |
| | | } |
| | | |
| | | return mn, nil |
| | | } |
| | | |
| | | func (ms *MicroNode) printLog(v ...interface{}) { |
| | | if ms.fnLog != nil { |
| | | ms.fnLog(v...) |
| | | } else { |
| | | fmt.Println(v...) |
| | | } |
| | | } |
| | | |
| | | func (ms *MicroNode) UpdateNodeTopics(ts []bhomeclient.NodeInfo) { |
| | | ms.handle.UpdateNodeTopics(ts) |
| | | } |
| | | |
| | | func (ms *MicroNode) DeRegister() error { |
| | | if ms.handle != nil { |
| | | return ms.handle.DeRegister(ms.reg) |
| | | } |
| | | return errors.New("ms.handle is nil") |
| | | } |
| | | |
| | | func (ms *MicroNode) startHeartbeat() { |
| | | hbi := &bhomeclient.HeartBeatInfo{ |
| | | HealthLevel: "health", |
| | | Fps: 12, |
| | | WarnInfo: "warn", |
| | | ErrorInfo: "error", |
| | | Proc: *ms.procInfo, |
| | | } |
| | | |
| | | t := time.NewTicker(time.Second) |
| | | defer t.Stop() |
| | | |
| | | for { |
| | | select { |
| | | case <-ms.ctx.Done(): |
| | | return |
| | | case <-t.C: |
| | | ms.handle.HeartBeat(hbi) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (ms *MicroNode) StartClient() { |
| | | go ms.startHeartbeat() |
| | | } |
| | | |
| | | func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) { |
| | | ms.handlers = funcMap |
| | | |
| | | go ms.startHeartbeat() |
| | | |
| | | for { |
| | | select { |
| | | case <- ms.ctx.Done(): |
| | | return |
| | | default: |
| | | msgS, msgR, keyR := ms.handle.GetMsg() |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | if ch,ok := ms.SubChM[msgS.Topic];ok { |
| | | ch <- msgS |
| | | } |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | | go ms.serve(msgR, keyR) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) { |
| | | t := time.Now() |
| | | topicName := request.Header("Servicename") |
| | | |
| | | if topicName == "" { |
| | | return nil,errors.New("Servicename 不能为空") |
| | | } |
| | | ms.printLog("1:", time.Since(t)) |
| | | t = time.Now() |
| | | rb, _ := json.Marshal(request) |
| | | msgR := &bhomeclient.MsgInfo { |
| | | Topic: request.Path, |
| | | Body: rb, |
| | | } |
| | | ms.printLog("2:", time.Since(t)) |
| | | t = time.Now() |
| | | mi,err := ms.handle.Request(serverId, msgR, 5000) |
| | | if mi == nil || err != nil { |
| | | return nil, err |
| | | } |
| | | ms.printLog("3:", time.Since(t)) |
| | | t = time.Now() |
| | | ri := new(Reply) |
| | | err = json.Unmarshal(mi.Body, ri) |
| | | if err != nil { |
| | | ms.printLog("unmarshal mi.Body err:", err) |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "服务请求失败", |
| | | Data: "服务请求失败", |
| | | } |
| | | } |
| | | ms.printLog("4:", time.Since(t)) |
| | | return ri, nil |
| | | } |
| | | |
| | | func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) { |
| | | rb, _ := json.Marshal(request) |
| | | msgR := &bhomeclient.MsgInfo{ |
| | | Topic: request.Path, |
| | | Body: rb, |
| | | } |
| | | |
| | | mi, err := ms.handle.Request(serverId, msgR, 5000) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var ri *Reply |
| | | err = json.Unmarshal(mi.Body, ri) |
| | | if err != nil { |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "服务请求失败", |
| | | Data: "服务请求失败", |
| | | } |
| | | } |
| | | return ri, nil |
| | | } |
| | | |
| | | //获取本机中某一个主题的 key (结果只有一个元素) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | | return netNodes |
| | | } |
| | | |
| | | //获取集群中所有节点某个主题的key信息, (结果可能有多个) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | | return netNodes |
| | | } |
| | | |
| | | func (ms *MicroNode) serve(msgR *bhomeclient.MsgInfo, p int) { |
| | | var reqBody Request |
| | | err := json.Unmarshal(msgR.Body, &reqBody) |
| | | if err != nil { |
| | | ms.printLog("serve unmarshal msgR.Body err:", err) |
| | | } |
| | | |
| | | ms.printLog("reqBody:", reqBody) |
| | | var ri *Reply |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | ri = f(&reqBody) |
| | | ms.printLog("call funcMap f,reply:", *ri) |
| | | } else { |
| | | ms.printLog("ms.funcMap not eixst path") |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "请求的接口不存在,请检查url", |
| | | Data: "请求的接口不存在,请检查url", |
| | | } |
| | | } |
| | | rd,err := json.Marshal(*ri) |
| | | if err != nil { |
| | | ms.printLog("marshal *ri err:", err) |
| | | } |
| | | rMsg := bhomeclient.MsgInfo{ |
| | | Body: rd, |
| | | } |
| | | ms.handle.Reply(p, rMsg) |
| | | } |
| | | |
| | | //发布到本机 |
| | | func (ms *MicroNode) Publish(topic string,msg []byte) error { |
| | | nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{}) |
| | | return ms.PublishNet(nodes, topic, msg) |
| | | } |
| | | |
| | | func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error { |
| | | pi := &bhomeclient.MsgInfo{ |
| | | Topic: topic, |
| | | Body: msg, |
| | | } |
| | | return ms.handle.Pub(nodes, pi) |
| | | } |
| | | |
| | | func (ms *MicroNode) Subscribe(topics []string) chan []byte { |
| | | ch := make(chan []byte) |
| | | return ch |
| | | } |
| | | |
| | | //free handle |
| | | func (ms *MicroNode) Free() { |
| | | if ms.handle != nil { |
| | | ms.handle.Free() |
| | | } |
| | | } |
New file |
| | |
| | | package mc |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | ) |
| | | |
| | | type Request struct { |
| | | Path string `json:"path"` |
| | | Method string `json:"method"` |
| | | ContentType string `json:"contentType"` |
| | | HeaderMap map[string][]string `json:"headerMap"` |
| | | QueryMap map[string][]string `json:"queryMap"` |
| | | FormMap map[string][]string `json:"formMap"` |
| | | PostFormMap map[string][]string `json:"postFormMap"` |
| | | Body []byte `json:"body"` |
| | | File FileArg `json:"file"` |
| | | MultiFiles []FileArg `json:"multiFiles""` |
| | | } |
| | | |
| | | type FileArg struct { |
| | | Name string `json:"name"` |
| | | Size int64 `json:"size"` |
| | | Bytes []byte `json:"bytes"` |
| | | } |
| | | |
| | | type Reply struct { |
| | | Success bool `json:"success"` |
| | | Msg string `json:"msg"` |
| | | Data interface{} `json:"data"` |
| | | } |
| | | |
| | | func (r *Request) Header(key string) string { |
| | | if values, ok := r.HeaderMap[key]; ok { |
| | | return values[0] |
| | | } |
| | | return "" |
| | | } |
| | | |
| | | func (r *Request) Query(key string) string { |
| | | if values, ok := r.QueryMap[key]; ok { |
| | | return values[0] |
| | | } |
| | | return "" |
| | | } |
| | | |
| | | func (r *Request) PostForm(key string) string { |
| | | if values, ok := r.PostFormMap[key]; ok { |
| | | return values[0] |
| | | } |
| | | return "" |
| | | } |
| | | |
| | | func (r *Request) BindJSON(v interface{}) error { |
| | | return json.Unmarshal(r.Body, &v) |
| | | } |
| | | |
| | | func (r *Request) FormFile() (*FileArg, error) { |
| | | if r.File.Name != "" && r.File.Size >0 && r.File.Bytes !=nil { |
| | | return &r.File,nil |
| | | } |
| | | return nil, errors.New("file not found") |
| | | } |
| | | |
| | | func (r *Request) FormFiles() (*[]FileArg, error) { |
| | | if r.MultiFiles != nil && len(r.MultiFiles) >0 { |
| | | return &r.MultiFiles, nil |
| | | } |
| | | return nil,errors.New("multi files not found") |
| | | } |
New file |
| | |
| | | package mc |
| | | |
| | | type MicroFunc func(req *Request) *Reply |
| | | |
| | | |
| | | type Transport interface { |
| | | RequestTopic(string, Request) (*Reply,error) |
| | | } |