package mc import ( "basic.com/valib/bhomebus.git" "bhomeclient" "context" "encoding/json" "errors" "fmt" "os" "time" ) type MicroNode struct { ctx context.Context handle *bhomeclient.BHBus reg *bhomeclient.RegisterInfo procInfo *bhomeclient.ProcInfo handlers map[string]MicroFunc serverId string fnLog func(...interface{}) SubChM map[string]chan *bhomeclient.MsgInfo //以订阅的主题为key } func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *bhomeclient.RegisterInfo, procInfo *bhomeclient.ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ conf := bhomeclient.NewConfig(bhomeclient.KEY_REGISTER,512,5,10,10,100, fnLog) handle, err := bhomeclient.Register(ctx, q, conf, reg) if err != nil { return nil, err } mn := &MicroNode { serverId: serverId, handle: handle, reg: reg, procInfo: procInfo, fnLog: fnLog, SubChM: make(map[string]chan *bhomeclient.MsgInfo), } for _,subTopic := range reg.SubTopic { mn.SubChM[subTopic] = make(chan *bhomeclient.MsgInfo, 512) } return mn, nil } func (ms *MicroNode) printLog(v ...interface{}) { if ms.fnLog != nil { ms.fnLog(v...) } else { fmt.Println(v...) } } func (ms *MicroNode) UpdateNodeTopics(ts []bhomeclient.NodeInfo) { ms.handle.UpdateNodeTopics(ts) } func (ms *MicroNode) DeRegister() error { if ms.handle != nil { return ms.handle.DeRegister(ms.reg) } return errors.New("ms.handle is nil") } func (ms *MicroNode) startHeartbeat() { hbi := &bhomeclient.HeartBeatInfo{ HealthLevel: "health", Fps: 12, WarnInfo: "warn", ErrorInfo: "error", Proc: *ms.procInfo, } t := time.NewTicker(time.Second) defer t.Stop() for { select { case <-ms.ctx.Done(): return case <-t.C: ms.handle.HeartBeat(hbi) } } } func (ms *MicroNode) StartClient() { go ms.startHeartbeat() } func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) { ms.handlers = funcMap go ms.startHeartbeat() for { select { case <- ms.ctx.Done(): return default: msgS, msgR, keyR := ms.handle.GetMsg() if msgS != nil { //收到其它进程的发布消息 ms.printLog("Recv Sub Message:", string(msgS.Body)) if ch,ok := ms.SubChM[msgS.Topic];ok { ch <- msgS } } if msgR != nil { //收到其它进程的请求消息 go ms.serve(msgR, keyR) } } } } func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) { t := time.Now() topicName := request.Header("Servicename") if topicName == "" { return nil,errors.New("Servicename 不能为空") } ms.printLog("1:", time.Since(t)) t = time.Now() rb, _ := json.Marshal(request) msgR := &bhomeclient.MsgInfo { Topic: request.Path, Body: rb, } ms.printLog("2:", time.Since(t)) t = time.Now() mi,err := ms.handle.Request(serverId, msgR, 5000) if mi == nil || err != nil { return nil, err } ms.printLog("3:", time.Since(t)) t = time.Now() ri := new(Reply) err = json.Unmarshal(mi.Body, ri) if err != nil { ms.printLog("unmarshal mi.Body err:", err) ri = &Reply{ Success: false, Msg: "服务请求失败", Data: "服务请求失败", } } ms.printLog("4:", time.Since(t)) return ri, nil } func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) { rb, _ := json.Marshal(request) msgR := &bhomeclient.MsgInfo{ Topic: request.Path, Body: rb, } mi, err := ms.handle.Request(serverId, msgR, 5000) if err != nil { return nil, err } var ri *Reply err = json.Unmarshal(mi.Body, ri) if err != nil { ri = &Reply{ Success: false, Msg: "服务请求失败", Data: "服务请求失败", } } return ri, nil } //获取本机中某一个主题的 key (结果只有一个元素) func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) if err != nil { return nil } return netNodes } //获取集群中所有节点某个主题的key信息, (结果可能有多个) func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) if err != nil { return nil } return netNodes } func (ms *MicroNode) serve(msgR *bhomeclient.MsgInfo, p int) { var reqBody Request err := json.Unmarshal(msgR.Body, &reqBody) if err != nil { ms.printLog("serve unmarshal msgR.Body err:", err) } ms.printLog("reqBody:", reqBody) var ri *Reply if f,ok := ms.handlers[reqBody.Path];ok { ri = f(&reqBody) ms.printLog("call funcMap f,reply:", *ri) } else { ms.printLog("ms.funcMap not eixst path") ri = &Reply{ Success: false, Msg: "请求的接口不存在,请检查url", Data: "请求的接口不存在,请检查url", } } rd,err := json.Marshal(*ri) if err != nil { ms.printLog("marshal *ri err:", err) } rMsg := bhomeclient.MsgInfo{ Body: rd, } ms.handle.Reply(p, rMsg) } //发布到本机 func (ms *MicroNode) Publish(topic string,msg []byte) error { nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{}) return ms.PublishNet(nodes, topic, msg) } func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error { pi := &bhomeclient.MsgInfo{ Topic: topic, Body: msg, } return ms.handle.Pub(nodes, pi) } func (ms *MicroNode) Subscribe(topics []string) chan []byte { ch := make(chan []byte) return ch } //free handle func (ms *MicroNode) Free() { if ms.handle != nil { ms.handle.Free() } }