| | |
| | | package mc |
| | | package bhomeclient |
| | | |
| | | import ( |
| | | "basic.com/valib/bhomebus.git" |
| | |
| | | serverId string |
| | | fnLog func(...interface{}) |
| | | |
| | | SubChM map[string]chan *MsgInfo //以订阅的主题为key |
| | | SubCh chan *MsgInfo |
| | | } |
| | | |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog) |
| | | func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){ |
| | | conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog) |
| | | handle, err := Register(ctx, q, conf, reg) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | mn := &MicroNode { |
| | | ctx: ctx, |
| | | serverId: serverId, |
| | | handle: handle, |
| | | reg: reg, |
| | | procInfo: procInfo, |
| | | fnLog: fnLog, |
| | | SubChM: make(map[string]chan *MsgInfo), |
| | | } |
| | | for _,subTopic := range reg.SubTopic { |
| | | mn.SubChM[subTopic] = make(chan *MsgInfo, 512) |
| | | procInfo: ®.Proc, |
| | | fnLog: fnLog, |
| | | SubCh: make(chan *MsgInfo, 512), |
| | | } |
| | | |
| | | return mn, nil |
| | |
| | | Proc: *ms.procInfo, |
| | | } |
| | | |
| | | t := time.NewTicker(time.Second) |
| | | t := time.NewTicker(1 * time.Second) |
| | | defer t.Stop() |
| | | |
| | | for { |
| | |
| | | ms.handlers = funcMap |
| | | |
| | | go ms.startHeartbeat() |
| | | //接收订阅到的消息 |
| | | go ms.startRecvSubMsg() |
| | | //作为server启动 |
| | | ms.serve() |
| | | } |
| | | |
| | | //开始接收订阅消息 |
| | | func (ms *MicroNode) startRecvSubMsg() { |
| | | for { |
| | | select { |
| | | case <- ms.ctx.Done(): |
| | | return |
| | | default: |
| | | msgS, msgR, keyR := ms.handle.GetMsg() |
| | | msgS := ms.handle.GetMsg() |
| | | if msgS != nil { |
| | | //收到其它进程的发布消息 |
| | | ms.printLog("Recv Sub Message:", string(msgS.Body)) |
| | | if ch,ok := ms.SubChM[msgS.Topic];ok { |
| | | ch <- msgS |
| | | } |
| | | ms.SubCh <- msgS |
| | | } |
| | | if msgR != nil { |
| | | //收到其它进程的请求消息 |
| | | go ms.serve(msgR, keyR) |
| | | } |
| | | |
| | | time.Sleep(50 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) { |
| | | func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { |
| | | t := time.Now() |
| | | topicName := request.Header("Servicename") |
| | | |
| | | if topicName == "" { |
| | | return nil,errors.New("Servicename 不能为空") |
| | | } |
| | | ms.printLog("1:", time.Since(t)) |
| | | t = time.Now() |
| | | rb, _ := json.Marshal(request) |
| | |
| | | Body: rb, |
| | | } |
| | | ms.printLog("2:", time.Since(t)) |
| | | t = time.Now() |
| | | mi,err := ms.handle.Request(serverId, msgR, 5000) |
| | | if mi == nil || err != nil { |
| | | return nil, err |
| | | } |
| | | ms.printLog("3:", time.Since(t)) |
| | | t = time.Now() |
| | | ri := new(Reply) |
| | | err = json.Unmarshal(mi.Body, ri) |
| | | if err != nil { |
| | | ms.printLog("unmarshal mi.Body err:", err) |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "服务请求失败", |
| | | Data: "服务请求失败", |
| | | } |
| | | } |
| | | ms.printLog("4:", time.Since(t)) |
| | | return ri, nil |
| | | return ms.handle.Request(serverId, msgR, milliSecs) |
| | | } |
| | | |
| | | func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) { |
| | | func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { |
| | | rb, _ := json.Marshal(request) |
| | | msgR := &MsgInfo{ |
| | | Topic: request.Path, |
| | | Body: rb, |
| | | } |
| | | |
| | | mi, err := ms.handle.Request(serverId, msgR, 5000) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var ri *Reply |
| | | err = json.Unmarshal(mi.Body, ri) |
| | | if err != nil { |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "服务请求失败", |
| | | Data: "服务请求失败", |
| | | } |
| | | } |
| | | return ri, nil |
| | | return ms.handle.Request(serverId, msgR, milliSecs) |
| | | } |
| | | |
| | | func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) { |
| | | return ms.handle.RequestOnly(rData, nodes) |
| | | } |
| | | |
| | | //获取本机中某一个主题的 key (结果只有一个元素) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) |
| | | func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) |
| | | if err != nil { |
| | | ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) |
| | | return nil |
| | | } |
| | | return netNodes |
| | | } |
| | | |
| | | //获取集群中所有节点某个主题的key信息, (结果可能有多个) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) |
| | | func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { |
| | | netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | | return netNodes |
| | | } |
| | | |
| | | func (ms *MicroNode) serve(msgR *MsgInfo, p int) { |
| | | var reqBody Request |
| | | err := json.Unmarshal(msgR.Body, &reqBody) |
| | | if err != nil { |
| | | ms.printLog("serve unmarshal msgR.Body err:", err) |
| | | func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) { |
| | | r := MsgInfo{ |
| | | SrcProc: *ms.procInfo, |
| | | MsgType: MesgType_ReqRep, |
| | | Topic: TOPIC_QUERYPROC, |
| | | } |
| | | |
| | | ms.printLog("reqBody:", reqBody) |
| | | var ri *Reply |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | ri = f(&reqBody) |
| | | ms.printLog("call funcMap f,reply:", *ri) |
| | | cr, err := ms.handle.RequestCenter(&r) |
| | | if err != nil { |
| | | ms.printLog("requestCenter reply:", cr, "err:", err) |
| | | return nil, err |
| | | } |
| | | if cr.Success { |
| | | rd,err := json.Marshal(cr.Data) |
| | | if err == nil { |
| | | var list []RegisteredClient |
| | | err = json.Unmarshal(rd, &list) |
| | | if err == nil { |
| | | return list, nil |
| | | } else { |
| | | ms.printLog("unmarshal to RegisteredClient list err:", err) |
| | | } |
| | | } else { |
| | | return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) |
| | | } |
| | | } else { |
| | | ms.printLog("ms.funcMap not eixst path") |
| | | ri = &Reply{ |
| | | Success: false, |
| | | Msg: "请求的接口不存在,请检查url", |
| | | Data: "请求的接口不存在,请检查url", |
| | | ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data) |
| | | } |
| | | return nil, fmt.Errorf("GetRegisteredClient list failed") |
| | | } |
| | | |
| | | func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { |
| | | ri := &Reply{} |
| | | if ms.handlers == nil { |
| | | ri.Msg = "send wrong addr, check yourself!!!" |
| | | } else { |
| | | var msgR MsgInfo |
| | | err := json.Unmarshal(rdata, &msgR) |
| | | if err != nil { |
| | | ri.Msg = err.Error() |
| | | } else { |
| | | var reqBody Request |
| | | err = json.Unmarshal(rdata, &msgR.Body) |
| | | if err != nil { |
| | | ri.Msg = err.Error() |
| | | } else { |
| | | ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) |
| | | if f,ok := ms.handlers[reqBody.Path];ok { |
| | | reqBody.SrcProc = msgR.SrcProc |
| | | ri = f(&reqBody) |
| | | ms.printLog("call funcMap f,reply:", *ri) |
| | | } else { |
| | | ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) |
| | | ri.Msg = "请求的接口不存在,请检查url" |
| | | } |
| | | } |
| | | } |
| | | } |
| | | rd,err := json.Marshal(*ri) |
| | | result, err := json.Marshal(*ri) |
| | | if err != nil { |
| | | ms.printLog("marshal *ri err:", err) |
| | | sdata = nil |
| | | } else { |
| | | sdata = &result |
| | | } |
| | | rMsg := MsgInfo{ |
| | | Body: rd, |
| | | return ri.Success |
| | | } |
| | | |
| | | func (ms *MicroNode) serve() { |
| | | if ms.handlers == nil { |
| | | return |
| | | } |
| | | ms.handle.Reply(p, rMsg) |
| | | for i:=0;i<10;i++ { |
| | | ms.handle.wg.Add(1) |
| | | go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) |
| | | } |
| | | } |
| | | |
| | | //发布到本机 |
| | | func (ms *MicroNode) Publish(topic string,msg []byte) error { |
| | | nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{}) |
| | | nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: 8, |
| | | }) |
| | | return ms.PublishNet(nodes, topic, msg) |
| | | } |
| | | |
| | |
| | | return ms.handle.Pub(nodes, pi) |
| | | } |
| | | |
| | | func (ms *MicroNode) Subscribe(topics []string) chan []byte { |
| | | ch := make(chan []byte) |
| | | return ch |
| | | func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int { |
| | | pi := &MsgInfo{ |
| | | Topic: topic, |
| | | Body: msg, |
| | | } |
| | | return ms.handle.PubTimeout(nodes, pi, timeout) |
| | | } |
| | | |
| | | //订阅主题 |
| | | func (ms *MicroNode) Subscribe(topics []string) { |
| | | ms.handle.Sub(topics) |
| | | for _,t := range topics { |
| | | if ms.reg.SubTopic == nil { |
| | | ms.reg.SubTopic = make([]string, 0) |
| | | } |
| | | found := false |
| | | for _,it := range ms.reg.SubTopic { |
| | | if it == t { |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | ms.reg.SubTopic = append(ms.reg.SubTopic, t) |
| | | } |
| | | } |
| | | } |
| | | |
| | | //取消订阅的主题 |
| | | func (ms *MicroNode) DeSub(topics []string) { |
| | | ms.printLog("DeSub topics:", topics) |
| | | ms.handle.DeSub(topics) |
| | | if ms.reg.SubTopic != nil { |
| | | var leftTopics []string |
| | | for _,t := range ms.reg.SubTopic { |
| | | found := false |
| | | for _,it := range topics { |
| | | if it == t { |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if !found { |
| | | leftTopics = append(leftTopics, t) |
| | | } |
| | | } |
| | | ms.reg.SubTopic = leftTopics |
| | | } |
| | | } |
| | | |
| | | //free handle |