package bhomeclient import ( "basic.com/valib/c_bhomebus.git/proto/source/bhome_msg" "context" "errors" "fmt" jsoniter "github.com/json-iterator/go" "os" "sync" "time" ) type MicroNode struct { ctx context.Context handle *BHBus reg *RegisterInfo procInfo *ProcInfo handlers map[string]MicroFunc serverId string fnLog func(...interface{}) SubCh chan *bhome_msg.MsgPublish mtx sync.Mutex started bool } func NewMicroNode(ctx context.Context, q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error) { conf := NewConfig(KEY_REGISTER, 512, 5, 60000, 60000, 2000, fnLog) handle, err := Register(ctx, q, conf, reg) if err != nil { return nil, err } mn := &MicroNode{ ctx: ctx, serverId: serverId, handle: handle, reg: reg, procInfo: ®.Proc, fnLog: fnLog, SubCh: make(chan *bhome_msg.MsgPublish, 512), } //go startHeartbeat(ctx, handle) return mn, nil } func (ms *MicroNode) printLog(v ...interface{}) { if ms.fnLog != nil { ms.fnLog(v...) } else { fmt.Println(v...) } } func (ms *MicroNode) UpdateNodeTopics(ts []NodeInfo) { ms.handle.UpdateNodeTopics(ts) } func (ms *MicroNode) DeRegister() error { if ms.handle != nil { return ms.handle.DeRegister(ms.reg) } return errors.New("ms.handle is nil") } func startHeartbeat(ctx context.Context, h *BHBus) { t := time.NewTicker(1 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: h.HeartBeat() } } } func (ms *MicroNode) StartClient() { } func (ms *MicroNode) StartServer(funcMap map[string]MicroFunc) { ms.mtx.Lock() if !ms.started { ms.started = true ms.mtx.Unlock() ms.handlers = funcMap for { select { case <-ms.ctx.Done(): return case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息 go ms.serve(ms.handle.ctx, &msgR) case msgS := <-ms.handle.ChSub: ms.SubCh <- &msgS } } } ms.mtx.Unlock() } func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply, error) { t := time.Now() ms.printLog("1:", time.Since(t)) t = time.Now() var json = jsoniter.ConfigCompatibleWithStandardLibrary rb, _ := json.Marshal(request) msgR := &bhome_msg.MsgRequestTopic{ Topic: []byte(request.Path), Data: rb, } ms.printLog("2:", time.Since(t)) return ms.handle.Request(serverId, msgR, milliSecs) } func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply, error) { var json = jsoniter.ConfigCompatibleWithStandardLibrary rb, _ := json.Marshal(request) msgR := &bhome_msg.MsgRequestTopic{ Topic: []byte(request.Path), Data: rb, } return ms.handle.Request(serverId, msgR, milliSecs) } func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) { return ms.handle.RequestOnly(req, dest) } //获取本机中某一个主题的 key (结果只有一个元素) func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []*bhome_msg.MsgQueryTopicReply_BHNodeAddress { netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) if err != nil { ms.printLog("topic:", topicName, " netNodes:", netNodes, "err:", err) return nil } return netNodes } //获取集群中所有节点某个主题的key信息, (结果可能有多个) //func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress { // netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) // if err != nil { // return nil // } // return netNodes //} func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info, error) { return ms.handle.RequestCenter() } func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) { if ms.handlers == nil { return } var json = jsoniter.ConfigCompatibleWithStandardLibrary var reqBody Request var ri *Reply err := json.Unmarshal(msgR.Data, &reqBody) if err != nil { ms.printLog("serve unmarshal msgR.Body err:", err) ri = &Reply{ Msg: err.Error(), } } else { ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:", reqBody.FormMap, " postFormMap:", reqBody.PostFormMap) if f, ok := ms.handlers[reqBody.Path]; ok { reqBody.SrcProc = ProcInfo{ ID: msgR.ProcId, } h := WrapperHandler{ ms, ms, } select { case <-ctx.Done(): ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!") default: ri = f(&h, &reqBody) ms.printLog("call funcMap f,reply.Success:", ri.Success) } } else { ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) ri = &Reply{ Success: false, Msg: "请求的接口不存在,请检查url", Data: "请求的接口不存在,请检查url", } } } retErr := ms.handle.Reply(msgR.Src, ri) if retErr != nil { ms.printLog("retErr:", retErr) } ri = nil } //发布到本机 func (ms *MicroNode) Publish(topic string, msg []byte) error { var nodes []bhome_msg.BHAddress return ms.PublishNet(nodes, topic, msg) } func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string, data []byte) error { pi := &bhome_msg.MsgPublish{ Topic: []byte(topic), Data: data, } return ms.handle.Pub(nodes, pi) } func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int { pi := &bhome_msg.MsgPublish{ Topic: []byte(topic), Data: data, } return ms.handle.PubTimeout(nodes, pi, timeout) } //订阅主题 func (ms *MicroNode) Subscribe(topics []string) { ms.handle.Sub(topics) for _, t := range topics { if ms.reg.SubTopic == nil { ms.reg.SubTopic = make([]string, 0) } found := false for _, it := range ms.reg.SubTopic { if it == t { found = true break } } if !found { ms.reg.SubTopic = append(ms.reg.SubTopic, t) } } } //取消订阅的主题 func (ms *MicroNode) DeSub(topics []string) { ms.printLog("DeSub topics:", topics) ms.handle.DeSub(topics) if ms.reg.SubTopic != nil { var leftTopics []string for _, t := range ms.reg.SubTopic { found := false for _, it := range topics { if it == t { found = true break } } if !found { leftTopics = append(leftTopics, t) } } ms.reg.SubTopic = leftTopics } } //free handle func (ms *MicroNode) Free() { if ms.handle != nil { ms.handle.Free() } }