From b9b16b451361341b990d3bbb78fc2d53b74202a0 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期四, 01 七月 2021 17:00:09 +0800 Subject: [PATCH] add SubscribeNet --- hbusc.go | 66 ++++++++++++++++++++++++++------- 1 files changed, 52 insertions(+), 14 deletions(-) diff --git a/hbusc.go b/hbusc.go index 68a01fe..62b2313 100644 --- a/hbusc.go +++ b/hbusc.go @@ -129,7 +129,7 @@ handle.wg = &sync.WaitGroup{} //鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭� - if ri.SubTopic != nil && len(ri.SubTopic) > 0 { + if len(ri.SubTopic) > 0 { handle.printLog("sub topics") var subList bhome_msg.MsgTopicList for _,v := range ri.SubTopic { @@ -137,11 +137,27 @@ } var subReply bhome_msg.MsgCommonReply - if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { - //鍚姩璁㈤槄淇℃伅鎺ユ敹 - handle.wg.Add(1) - go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) + if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { + handle.printLog("bhsgo.Subscribe ret false") } + } + + if len(ri.SubNetTopic) > 0 { + handle.printLog("sub net topics") + var subNetList bhome_msg.MsgTopicList + for _,v := range ri.SubNetTopic { + subNetList.TopicList = append(subNetList.TopicList, []byte(v)) + } + var subNetReply bhome_msg.MsgCommonReply + if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) { + handle.printLog("bhsgo.SubscribeNet ret false") + } + } + + if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 { + //鍚姩璁㈤槄淇℃伅鎺ユ敹 + handle.wg.Add(1) + go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) } return handle, nil @@ -218,9 +234,20 @@ //鑾峰彇topic瀵瑰簲鐨刱ey //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key -func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) { - - return nil, nil +func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { + dest := bhome_msg.BHAddress{} + reqTopic := bhome_msg.MsgQueryTopic{ + Topic: []byte(topic), + } + rep := bhome_msg.MsgQueryTopicReply{} + if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) { + return rep.NodeAddress, nil + } + if rep.Errmsg != nil { + h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString)) + return nil, errors.New(string(rep.Errmsg.ErrString)) + } + return nil, errors.New("bhsgo.QueryTopicAddress ret false") } func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { @@ -237,21 +264,26 @@ return &reply, nil } else { + i, s := bhsgo.GetLastError() + h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic)) return nil, errors.New("request ") } } -func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) { +func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) { dest := bhome_msg.BHAddress{} if destArr != nil && len(destArr) > 0 { - dest = destArr[0] + if destArr[0].Addr != nil { + dest = *(destArr[0].Addr) + } } pid := "" r := bhome_msg.MsgRequestTopicReply{} if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) { return r.Data, nil } else { - h.printLog("bhsgo.Request request err:", r.Errmsg) + i, s := bhsgo.GetLastError() + h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest) return nil, errors.New("bhsgo.Request return false") } } @@ -271,9 +303,15 @@ return errors.New("reply return false") } -func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) { - - return nil, errors.New("") +func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) { + dest := &bhome_msg.BHAddress{} + topic := &bhome_msg.MsgQueryProc{} + rep := &bhome_msg.MsgQueryProcReply{} + if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) { + return rep.ProcList, nil + } else { + return nil, errors.New("QueryProcs ret flase") + } } -- Gitblit v1.8.0