package bhomeclient import ( "basic.com/valib/bhomebus.git" "context" "encoding/json" "errors" "fmt" "os" "time" ) type MicroNode struct { ctx context.Context handle *BHBus reg *RegisterInfo procInfo *ProcInfo handlers map[string]MicroFunc serverId string fnLog func(...interface{}) SubCh chan *MsgInfo } func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){ conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog) handle, err := Register(ctx, q, conf, reg) if err != nil { return nil, err } mn := &MicroNode { ctx: ctx, serverId: serverId, handle: handle, reg: reg, procInfo: ®.Proc, fnLog: fnLog, SubCh: make(chan *MsgInfo, 512), } return mn, nil } func (ms *MicroNode) printLog(v ...interface{}) { if ms.fnLog != nil { ms.fnLog(v...) } else { fmt.Println(v...) } } func (ms *MicroNode) UpdateNodeTopics(ts []NodeInfo) { ms.handle.UpdateNodeTopics(ts) } func (ms *MicroNode) DeRegister() error { if ms.handle != nil { return ms.handle.DeRegister(ms.reg) } return errors.New("ms.handle is nil") } func (ms *MicroNode) startHeartbeat() { hbi := &HeartBeatInfo{ HealthLevel: "health", Fps: 12, WarnInfo: "warn", ErrorInfo: "error", Proc: *ms.procInfo, } t := time.NewTicker(1 * time.Second) defer t.Stop() for { select { case <-ms.ctx.Done(): return case <-t.C: ms.handle.HeartBeat(hbi) } } } func (ms *MicroNode) StartClient() { go ms.startHeartbeat() } func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) { ms.handlers = funcMap go ms.startHeartbeat() for { select { case <- ms.ctx.Done(): return default: msgS, msgR, keyR := ms.handle.GetMsg() if msgS != nil { //收到其它进程的发布消息 ms.printLog("Recv Sub Message:", string(msgS.Body)) ms.SubCh <- msgS } if msgR != nil { //收到其它进程的请求消息 go ms.serve(msgR, keyR) } time.Sleep(50 * time.Millisecond) } } } func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { t := time.Now() ms.printLog("1:", time.Since(t)) t = time.Now() rb, _ := json.Marshal(request) msgR := &MsgInfo { Topic: request.Path, Body: rb, } ms.printLog("2:", time.Since(t)) return ms.handle.Request(serverId, msgR, milliSecs) } func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { rb, _ := json.Marshal(request) msgR := &MsgInfo{ Topic: request.Path, Body: rb, } return ms.handle.Request(serverId, msgR, milliSecs) } func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) { return ms.handle.RequestOnly(rData, nodes) } //获取本机中某一个主题的 key (结果只有一个元素) func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) if err != nil { ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) return nil } return netNodes } //获取集群中所有节点某个主题的key信息, (结果可能有多个) func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) if err != nil { return nil } return netNodes } func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) { r := MsgInfo{ SrcProc: *ms.procInfo, MsgType: MesgType_ReqRep, Topic: TOPIC_QUERYPROC, } cr, err := ms.handle.RequestCenter(&r) if err != nil { ms.printLog("requestCenter reply:", cr, "err:", err) return nil, err } if cr.Success { rd,err := json.Marshal(cr.Data) if err == nil { var list []RegisteredClient err = json.Unmarshal(rd, &list) if err == nil { return list, nil } else { ms.printLog("unmarshal to RegisteredClient list err:", err) } } else { return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) } } else { ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data) } return nil, fmt.Errorf("GetRegisteredClient list failed") } func (ms *MicroNode) serve(msgR *MsgInfo, p int) { if ms.handlers == nil { return } var reqBody Request var ri *Reply err := json.Unmarshal(msgR.Body, &reqBody) if err != nil { ms.printLog("serve unmarshal msgR.Body err:", err) ri = &Reply { Msg: err.Error(), } } else { ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) if f,ok := ms.handlers[reqBody.Path];ok { reqBody.SrcProc = msgR.SrcProc ri = f(&reqBody) ms.printLog("call funcMap f,reply:", *ri) } else { ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) ri = &Reply{ Success: false, Msg: "请求的接口不存在,请检查url", Data: "请求的接口不存在,请检查url", } } } retErr := ms.handle.Reply(p, ri) if retErr != nil { ms.printLog("retErr:", retErr) } } //发布到本机 func (ms *MicroNode) Publish(topic string,msg []byte) error { nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ Key: 8, }) return ms.PublishNet(nodes, topic, msg) } func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error { pi := &MsgInfo{ Topic: topic, Body: msg, } return ms.handle.Pub(nodes, pi) } //订阅主题 func (ms *MicroNode) Subscribe(topics []string) { ms.handle.Sub(topics) for _,t := range topics { if ms.reg.SubTopic == nil { ms.reg.SubTopic = make([]string, 0) } found := false for _,it := range ms.reg.SubTopic { if it == t { found = true break } } if !found { ms.reg.SubTopic = append(ms.reg.SubTopic, t) } } } //取消订阅的主题 func (ms *MicroNode) DeSub(topics []string) { ms.printLog("DeSub topics:", topics) ms.handle.DeSub(topics) if ms.reg.SubTopic != nil { var leftTopics []string for _,t := range ms.reg.SubTopic { found := false for _,it := range topics { if it == t { found = true break } } if !found { leftTopics = append(leftTopics, t) } } ms.reg.SubTopic = leftTopics } } //free handle func (ms *MicroNode) Free() { if ms.handle != nil { ms.handle.Free() } }