From 81afb6ffbf7f76f49644a1832dcfe241552d7e08 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期五, 05 二月 2021 18:20:09 +0800 Subject: [PATCH] add recvandsend --- micronode.go | 238 +++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 151 insertions(+), 87 deletions(-) diff --git a/micronode.go b/micronode.go index eafe20b..9fa49a2 100644 --- a/micronode.go +++ b/micronode.go @@ -1,4 +1,4 @@ -package mc +package bhomeclient import ( "basic.com/valib/bhomebus.git" @@ -19,25 +19,23 @@ serverId string fnLog func(...interface{}) - SubChM map[string]chan *MsgInfo //浠ヨ闃呯殑涓婚涓簁ey + SubCh chan *MsgInfo } -func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, procInfo *ProcInfo,fnLog func(v ...interface{})) (*MicroNode, error){ - conf := NewConfig(KEY_REGISTER,512,5,10,10,100, fnLog) +func NewMicroNode(ctx context.Context,q chan os.Signal, serverId string, reg *RegisterInfo, fnLog func(v ...interface{})) (*MicroNode, error){ + conf := NewConfig(KEY_REGISTER,512,5,1000,1000,1000, fnLog) handle, err := Register(ctx, q, conf, reg) if err != nil { return nil, err } mn := &MicroNode { + ctx: ctx, serverId: serverId, handle: handle, reg: reg, - procInfo: procInfo, - fnLog: fnLog, - SubChM: make(map[string]chan *MsgInfo), - } - for _,subTopic := range reg.SubTopic { - mn.SubChM[subTopic] = make(chan *MsgInfo, 512) + procInfo: ®.Proc, + fnLog: fnLog, + SubCh: make(chan *MsgInfo, 512), } return mn, nil @@ -71,7 +69,7 @@ Proc: *ms.procInfo, } - t := time.NewTicker(time.Second) + t := time.NewTicker(1 * time.Second) defer t.Stop() for { @@ -92,35 +90,34 @@ ms.handlers = funcMap go ms.startHeartbeat() + //鎺ユ敹璁㈤槄鍒扮殑娑堟伅 + go ms.startRecvSubMsg() + //浣滀负server鍚姩 + ms.serve() +} +//寮�濮嬫帴鏀惰闃呮秷鎭� +func (ms *MicroNode) startRecvSubMsg() { for { select { case <- ms.ctx.Done(): return default: - msgS, msgR, keyR := ms.handle.GetMsg() + msgS := ms.handle.GetMsg() if msgS != nil { //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� ms.printLog("Recv Sub Message:", string(msgS.Body)) - if ch,ok := ms.SubChM[msgS.Topic];ok { - ch <- msgS - } + ms.SubCh <- msgS } - if msgR != nil { - //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� - go ms.serve(msgR, keyR) - } + + time.Sleep(50 * time.Millisecond) } } } -func (ms *MicroNode) Request(serverId string, request Request) (*Reply,error) { +func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { t := time.Now() - topicName := request.Header("Servicename") - if topicName == "" { - return nil,errors.New("Servicename 涓嶈兘涓虹┖") - } ms.printLog("1:", time.Since(t)) t = time.Now() rb, _ := json.Marshal(request) @@ -129,101 +126,123 @@ Body: rb, } ms.printLog("2:", time.Since(t)) - t = time.Now() - mi,err := ms.handle.Request(serverId, msgR, 5000) - if mi == nil || err != nil { - return nil, err - } - ms.printLog("3:", time.Since(t)) - t = time.Now() - ri := new(Reply) - err = json.Unmarshal(mi.Body, ri) - if err != nil { - ms.printLog("unmarshal mi.Body err:", err) - ri = &Reply{ - Success: false, - Msg: "鏈嶅姟璇锋眰澶辫触", - Data: "鏈嶅姟璇锋眰澶辫触", - } - } - ms.printLog("4:", time.Since(t)) - return ri, nil + return ms.handle.Request(serverId, msgR, milliSecs) } -func (ms *MicroNode) RequestTopic(serverId string, request Request) (*Reply,error) { +func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { rb, _ := json.Marshal(request) msgR := &MsgInfo{ Topic: request.Path, Body: rb, } - mi, err := ms.handle.Request(serverId, msgR, 5000) - if err != nil { - return nil, err - } - var ri *Reply - err = json.Unmarshal(mi.Body, ri) - if err != nil { - ri = &Reply{ - Success: false, - Msg: "鏈嶅姟璇锋眰澶辫触", - Data: "鏈嶅姟璇锋眰澶辫触", - } - } - return ri, nil + return ms.handle.Request(serverId, msgR, milliSecs) +} + +func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) { + return ms.handle.RequestOnly(rData, nodes) } //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級 -func (ms *MicroNode) GetLocalNetNodeByTopic(serviceName string) []bhomebus.NetNode { - netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, serviceName) +func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { + netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) if err != nil { + ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) return nil } return netNodes } //鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛� -func (ms *MicroNode) GetAllNetNodesByTopic(serviceName string) []bhomebus.NetNode { - netNodes, err := ms.handle.GetNetNodeByTopic("", serviceName) +func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { + netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) if err != nil { return nil } return netNodes } -func (ms *MicroNode) serve(msgR *MsgInfo, p int) { - var reqBody Request - err := json.Unmarshal(msgR.Body, &reqBody) - if err != nil { - ms.printLog("serve unmarshal msgR.Body err:", err) +func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) { + r := MsgInfo{ + SrcProc: *ms.procInfo, + MsgType: MesgType_ReqRep, + Topic: TOPIC_QUERYPROC, } - - ms.printLog("reqBody:", reqBody) - var ri *Reply - if f,ok := ms.handlers[reqBody.Path];ok { - ri = f(&reqBody) - ms.printLog("call funcMap f,reply:", *ri) + cr, err := ms.handle.RequestCenter(&r) + if err != nil { + ms.printLog("requestCenter reply:", cr, "err:", err) + return nil, err + } + if cr.Success { + rd,err := json.Marshal(cr.Data) + if err == nil { + var list []RegisteredClient + err = json.Unmarshal(rd, &list) + if err == nil { + return list, nil + } else { + ms.printLog("unmarshal to RegisteredClient list err:", err) + } + } else { + return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) + } } else { - ms.printLog("ms.funcMap not eixst path") - ri = &Reply{ - Success: false, - Msg: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", - Data: "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl", + ms.printLog("request center failed,status:", cr.Success, "msg:", cr.Msg, " data:", cr.Data) + } + return nil, fmt.Errorf("GetRegisteredClient list failed") +} + +func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { + ri := &Reply{} + if ms.handlers == nil { + ri.Msg = "send wrong addr, check yourself!!!" + } else { + var msgR MsgInfo + err := json.Unmarshal(rdata, &msgR) + if err != nil { + ri.Msg = err.Error() + } else { + var reqBody Request + err = json.Unmarshal(rdata, &msgR.Body) + if err != nil { + ri.Msg = err.Error() + } else { + ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) + if f,ok := ms.handlers[reqBody.Path];ok { + reqBody.SrcProc = msgR.SrcProc + ri = f(&reqBody) + ms.printLog("call funcMap f,reply:", *ri) + } else { + ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) + ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl" + } + } } } - rd,err := json.Marshal(*ri) + result, err := json.Marshal(*ri) if err != nil { - ms.printLog("marshal *ri err:", err) + sdata = nil + } else { + sdata = &result } - rMsg := MsgInfo{ - Body: rd, + return ri.Success +} + +func (ms *MicroNode) serve() { + if ms.handlers == nil { + return } - ms.handle.Reply(p, rMsg) + for i:=0;i<10;i++ { + ms.handle.wg.Add(1) + go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) + } } //鍙戝竷鍒版湰鏈� func (ms *MicroNode) Publish(topic string,msg []byte) error { - nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{}) + nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ + Key: 8, + }) return ms.PublishNet(nodes, topic, msg) } @@ -235,9 +254,54 @@ return ms.handle.Pub(nodes, pi) } -func (ms *MicroNode) Subscribe(topics []string) chan []byte { - ch := make(chan []byte) - return ch +func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int { + pi := &MsgInfo{ + Topic: topic, + Body: msg, + } + return ms.handle.PubTimeout(nodes, pi, timeout) +} + +//璁㈤槄涓婚 +func (ms *MicroNode) Subscribe(topics []string) { + ms.handle.Sub(topics) + for _,t := range topics { + if ms.reg.SubTopic == nil { + ms.reg.SubTopic = make([]string, 0) + } + found := false + for _,it := range ms.reg.SubTopic { + if it == t { + found = true + break + } + } + if !found { + ms.reg.SubTopic = append(ms.reg.SubTopic, t) + } + } +} + +//鍙栨秷璁㈤槄鐨勪富棰� +func (ms *MicroNode) DeSub(topics []string) { + ms.printLog("DeSub topics:", topics) + ms.handle.DeSub(topics) + if ms.reg.SubTopic != nil { + var leftTopics []string + for _,t := range ms.reg.SubTopic { + found := false + for _,it := range topics { + if it == t { + found = true + break + } + } + if !found { + leftTopics = append(leftTopics, t) + } + } + ms.reg.SubTopic = leftTopics + } } //free handle -- Gitblit v1.8.0