| | |
| | | ) |
| | | |
| | | type MicroNode struct { |
| | | ctx context.Context |
| | | handle *BHBus |
| | | reg *RegisterInfo |
| | | procInfo *ProcInfo |
| | | handlers map[string]MicroFunc |
| | | serverId string |
| | | fnLog func(...interface{}) |
| | | ctx context.Context |
| | | handle *BHBus |
| | | reg *RegisterInfo |
| | | procInfo *ProcInfo |
| | | handlers map[string]MicroFunc |
| | | serverId string |
| | | fnLog func(...interface{}) |
| | | |
| | | SubCh chan *bhome_msg.MsgPublish |
| | | SubCh chan *bhome_msg.MsgPublish |
| | | |
| | | mtx sync.Mutex |
| | | started bool |
| | | mtx sync.Mutex |
| | | started bool |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | conf := NewConfig(KEY_REGISTER,512,5,60000,60000,2000, fnLog) |
| | | func NewMicroNode(ctx context.Context, q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error) { |
| | | conf := NewConfig(KEY_REGISTER, 512, 5, 60000, 60000, 2000, fnLog) |
| | | handle, err := Register(ctx, q, conf, reg) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | mn := &MicroNode { |
| | | ctx: ctx, |
| | | mn := &MicroNode{ |
| | | ctx: ctx, |
| | | serverId: serverId, |
| | | handle: handle, |
| | | reg: reg, |
| | |
| | | return |
| | | case <-t.C: |
| | | h.HeartBeat() |
| | | default: |
| | | time.Sleep(500 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | for { |
| | | select { |
| | | case <- ms.ctx.Done(): |
| | | case <-ms.ctx.Done(): |
| | | return |
| | | case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息 |
| | | go ms.serve(ms.handle.ctx, &msgR) |
| | | case msgS := <-ms.handle.ChSub: |
| | | ms.SubCh <- &msgS |
| | | default: |
| | | time.Sleep(50 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | ms.mtx.Unlock() |
| | | } |
| | | |
| | | func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { |
| | | func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply, error) { |
| | | t := time.Now() |
| | | |
| | | ms.printLog("1:", time.Since(t)) |
| | |
| | | rb, _ := json.Marshal(request) |
| | | msgR := &bhome_msg.MsgRequestTopic{ |
| | | Topic: []byte(request.Path), |
| | | Data: rb, |
| | | Data: rb, |
| | | } |
| | | ms.printLog("2:", time.Since(t)) |
| | | return ms.handle.Request(serverId, msgR, milliSecs) |
| | | } |
| | | |
| | | func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { |
| | | func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply, error) { |
| | | rb, _ := json.Marshal(request) |
| | | msgR := &bhome_msg.MsgRequestTopic{ |
| | | Topic: []byte(request.Path), |
| | | Data: rb, |
| | | Data: rb, |
| | | } |
| | | |
| | | return ms.handle.Request(serverId, msgR, milliSecs) |
| | |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []*bhome_msg.MsgQueryTopicReply_BHNodeAddress { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) |
| | | if err != nil { |
| | | ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) |
| | | ms.printLog("topic:", topicName, " netNodes:", netNodes, "err:", err) |
| | | return nil |
| | | } |
| | | return netNodes |
| | |
| | | // return netNodes |
| | | //} |
| | | |
| | | func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info,error) { |
| | | func (ms *MicroNode) GetRegisteredClient() ([]*bhome_msg.MsgQueryProcReply_Info, error) { |
| | | return ms.handle.RequestCenter() |
| | | } |
| | | |
| | |
| | | err := json.Unmarshal(msgR.Data, &reqBody) |
| | | if err != nil { |
| | | ms.printLog("serve unmarshal msgR.Body err:", err) |
| | | ri = &Reply { |
| | | ri = &Reply{ |
| | | Msg: err.Error(), |
| | | } |
| | | } else { |
| | | ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap) |
| | | ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:", reqBody.FormMap, " postFormMap:", reqBody.PostFormMap) |
| | | |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | if f, ok := ms.handlers[reqBody.Path]; ok { |
| | | reqBody.SrcProc = ProcInfo{ |
| | | ID: msgR.ProcId, |
| | | } |
| | |
| | | ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "请求的接口不存在,请检查url", |
| | | Data: "请求的接口不存在,请检查url", |
| | | Msg: "请求的接口不存在,请检查url", |
| | | Data: "请求的接口不存在,请检查url", |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | //发布到本机 |
| | | func (ms *MicroNode) Publish(topic string,msg []byte) error { |
| | | func (ms *MicroNode) Publish(topic string, msg []byte) error { |
| | | var nodes []bhome_msg.BHAddress |
| | | return ms.PublishNet(nodes, topic, msg) |
| | | } |
| | | |
| | | func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error { |
| | | func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string, data []byte) error { |
| | | pi := &bhome_msg.MsgPublish{ |
| | | Topic: []byte(topic), |
| | | Data: data, |
| | | Data: data, |
| | | } |
| | | return ms.handle.Pub(nodes, pi) |
| | | } |
| | |
| | | func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int { |
| | | pi := &bhome_msg.MsgPublish{ |
| | | Topic: []byte(topic), |
| | | Data: data, |
| | | Data: data, |
| | | } |
| | | return ms.handle.PubTimeout(nodes, pi, timeout) |
| | | } |
| | |
| | | //订阅主题 |
| | | func (ms *MicroNode) Subscribe(topics []string) { |
| | | ms.handle.Sub(topics) |
| | | for _,t := range topics { |
| | | for _, t := range topics { |
| | | if ms.reg.SubTopic == nil { |
| | | ms.reg.SubTopic = make([]string, 0) |
| | | } |
| | | found := false |
| | | for _,it := range ms.reg.SubTopic { |
| | | for _, it := range ms.reg.SubTopic { |
| | | if it == t { |
| | | found = true |
| | | break |
| | |
| | | ms.handle.DeSub(topics) |
| | | if ms.reg.SubTopic != nil { |
| | | var leftTopics []string |
| | | for _,t := range ms.reg.SubTopic { |
| | | for _, t := range ms.reg.SubTopic { |
| | | found := false |
| | | for _,it := range topics { |
| | | for _, it := range topics { |
| | | if it == t { |
| | | found = true |
| | | break |