From c7069befa28a0f2594f0746044318a30d6989c19 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期日, 25 四月 2021 11:36:03 +0800 Subject: [PATCH] 使用bhsgo by lichao --- micronode.go | 175 +++++++++++++++------------------------------------------- 1 files changed, 45 insertions(+), 130 deletions(-) diff --git a/micronode.go b/micronode.go index 89e4088..a826412 100644 --- a/micronode.go +++ b/micronode.go @@ -1,7 +1,7 @@ package bhomeclient import ( - "basic.com/valib/bhomebus.git" + "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "context" "encoding/json" "errors" @@ -20,7 +20,7 @@ serverId string fnLog func(...interface{}) - SubCh chan *MsgInfo + SubCh chan *bhome_msg.MsgPublish mtx sync.Mutex started bool @@ -39,7 +39,7 @@ reg: reg, procInfo: ®.Proc, fnLog: fnLog, - SubCh: make(chan *MsgInfo, 512), + SubCh: make(chan *bhome_msg.MsgPublish, 512), } return mn, nil @@ -65,14 +65,6 @@ } func (ms *MicroNode) startHeartbeat() { - hbi := &HeartBeatInfo{ - HealthLevel: "health", - Fps: 12, - WarnInfo: "warn", - ErrorInfo: "error", - Proc: *ms.procInfo, - } - t := time.NewTicker(1 * time.Second) defer t.Stop() @@ -81,7 +73,9 @@ case <-ms.ctx.Done(): return case <-t.C: - ms.handle.HeartBeat(hbi) + ms.handle.HeartBeat() + default: + time.Sleep(500 * time.Millisecond) } } } @@ -110,52 +104,18 @@ select { case <- ms.ctx.Done(): return + case msgR := <-ms.handle.ChReply: //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� + go ms.serve(ms.handle.ctx, &msgR) + case msgS := <-ms.handle.ChSub: + ms.printLog("Recv Sub Message:", string(msgS.Data)) + ms.SubCh <- &msgS default: - msgS, msgR, keyR := ms.handle.GetMsg() - if msgS != nil { - //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� - ms.printLog("Recv Sub Message:", string(msgS.Body)) - ms.SubCh <- msgS - } - if msgR != nil { - //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� - go ms.serve(msgR, keyR) - } - time.Sleep(50 * time.Millisecond) } } - - //鎺ユ敹璁㈤槄鍒扮殑娑堟伅 - //go ms.startRecvSubMsg() - //浣滀负server鍚姩 - //ms.serve() } ms.mtx.Unlock() } - -//寮�濮嬫帴鏀惰闃呮秷鎭� -//func (ms *MicroNode) startRecvSubMsg() { -// for { -// select { -// case <- ms.ctx.Done(): -// return -// default: -// msgS, msgR, keyR := ms.handle.GetMsg() -// if msgS != nil { -// //鏀跺埌鍏跺畠杩涚▼鐨勫彂甯冩秷鎭� -// ms.printLog("Recv Sub Message:", string(msgS.Body)) -// ms.SubCh <- msgS -// } -// if msgR != nil { -// //鏀跺埌鍏跺畠杩涚▼鐨勮姹傛秷鎭� -// go ms.serve(msgR, keyR) -// } -// -// time.Sleep(50 * time.Millisecond) -// } -// } -//} func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) { t := time.Now() @@ -163,9 +123,9 @@ ms.printLog("1:", time.Since(t)) t = time.Now() rb, _ := json.Marshal(request) - msgR := &MsgInfo { - Topic: request.Path, - Body: rb, + msgR := &bhome_msg.MsgRequestTopic{ + Topic: []byte(request.Path), + Data: rb, } ms.printLog("2:", time.Since(t)) return ms.handle.Request(serverId, msgR, milliSecs) @@ -173,20 +133,20 @@ func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) { rb, _ := json.Marshal(request) - msgR := &MsgInfo{ - Topic: request.Path, - Body: rb, + msgR := &bhome_msg.MsgRequestTopic{ + Topic: []byte(request.Path), + Data: rb, } return ms.handle.Request(serverId, msgR, milliSecs) } -func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) { - return ms.handle.RequestOnly(rData, nodes) +func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) { + return ms.handle.RequestOnly(req, dest) } //鑾峰彇鏈満涓煇涓�涓富棰樼殑 key 锛堢粨鏋滃彧鏈変竴涓厓绱狅級 -func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode { +func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress { netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName) if err != nil { ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err) @@ -196,7 +156,7 @@ } //鑾峰彇闆嗙兢涓墍鏈夎妭鐐规煇涓富棰樼殑key淇℃伅锛� 锛堢粨鏋滃彲鑳芥湁澶氫釜锛� -func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode { +func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress { netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName) if err != nil { return nil @@ -205,11 +165,7 @@ } func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) { - r := MsgInfo{ - SrcProc: *ms.procInfo, - MsgType: MesgType_ReqRep, - Topic: TOPIC_QUERYPROC, - } + r := bhome_msg.MsgRequestTopic{} cr, err := ms.handle.RequestCenter(&r) if err != nil { ms.printLog("requestCenter reply:", cr, "err:", err) @@ -234,76 +190,37 @@ return nil, fmt.Errorf("GetRegisteredClient list failed") } -//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool { -// ri := &Reply{} -// if ms.handlers == nil { -// ri.Msg = "send wrong addr, check yourself!!!" -// } else { -// var msgR MsgInfo -// err := json.Unmarshal(rdata, &msgR) -// if err != nil { -// ri.Msg = err.Error() -// } else { -// var reqBody Request -// err = json.Unmarshal(rdata, &msgR.Body) -// if err != nil { -// ri.Msg = err.Error() -// } else { -// ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey) -// if f,ok := ms.handlers[reqBody.Path];ok { -// reqBody.SrcProc = msgR.SrcProc -// ri = f(&reqBody) -// ms.printLog("call funcMap f,reply:", *ri) -// } else { -// ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) -// ri.Msg = "璇锋眰鐨勬帴鍙d笉瀛樺湪锛岃妫�鏌rl" -// } -// } -// } -// } -// result, err := json.Marshal(*ri) -// if err != nil { -// sdata = nil -// } else { -// sdata = &result -// } -// return ri.Success -//} - -//func (ms *MicroNode) serve() { -// if ms.handlers == nil { -// return -// } -// for i:=0;i<10;i++ { -// ms.handle.wg.Add(1) -// go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog) -// } -//} - -func (ms *MicroNode) serve(msgR *MsgInfo, p int) { +func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) { if ms.handlers == nil { return } var reqBody Request var ri *Reply - err := json.Unmarshal(msgR.Body, &reqBody) + err := json.Unmarshal(msgR.Data, &reqBody) if err != nil { ms.printLog("serve unmarshal msgR.Body err:", err) ri = &Reply { Msg: err.Error(), } } else { - ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p) + ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap) if f,ok := ms.handlers[reqBody.Path];ok { - reqBody.SrcProc = msgR.SrcProc + reqBody.SrcProc = ProcInfo{ + ID: msgR.ProcId, + } h := WrapperHandler{ ms, ms, } - ri = f(&h, &reqBody) - ms.printLog("call funcMap f,reply.Success:", ri.Success) + select { + case <-ctx.Done(): + ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!") + default: + ri = f(&h, &reqBody) + ms.printLog("call funcMap f,reply.Success:", ri.Success) + } } else { ms.printLog("ms.funcMap not eixst path: ", reqBody.Path) ri = &Reply{ @@ -314,7 +231,7 @@ } } - retErr := ms.handle.Reply(p, ri) + retErr := ms.handle.Reply(msgR.Src, ri) if retErr != nil { ms.printLog("retErr:", retErr) } @@ -322,24 +239,22 @@ //鍙戝竷鍒版湰鏈� func (ms *MicroNode) Publish(topic string,msg []byte) error { - nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ - Key: 8, - }) + var nodes []bhome_msg.BHAddress return ms.PublishNet(nodes, topic, msg) } -func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error { - pi := &MsgInfo{ - Topic: topic, - Body: msg, +func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error { + pi := &bhome_msg.MsgPublish{ + Topic: []byte(topic), + Data: data, } return ms.handle.Pub(nodes, pi) } -func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int { - pi := &MsgInfo{ - Topic: topic, - Body: msg, +func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int { + pi := &bhome_msg.MsgPublish{ + Topic: []byte(topic), + Data: data, } return ms.handle.PubTimeout(nodes, pi, timeout) } -- Gitblit v1.8.0