From b9b16b451361341b990d3bbb78fc2d53b74202a0 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期四, 01 七月 2021 17:00:09 +0800 Subject: [PATCH] add SubscribeNet --- hbusc.go | 49 +++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 39 insertions(+), 10 deletions(-) diff --git a/hbusc.go b/hbusc.go index a09ea0e..62b2313 100644 --- a/hbusc.go +++ b/hbusc.go @@ -129,7 +129,7 @@ handle.wg = &sync.WaitGroup{} //鏈夎闃呮秷鎭墠闇�瑕佸惎鍔ㄥ崗绋嬫帴鏀舵秷鎭� - if ri.SubTopic != nil && len(ri.SubTopic) > 0 { + if len(ri.SubTopic) > 0 { handle.printLog("sub topics") var subList bhome_msg.MsgTopicList for _,v := range ri.SubTopic { @@ -137,11 +137,27 @@ } var subReply bhome_msg.MsgCommonReply - if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { - //鍚姩璁㈤槄淇℃伅鎺ユ敹 - handle.wg.Add(1) - go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) + if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { + handle.printLog("bhsgo.Subscribe ret false") } + } + + if len(ri.SubNetTopic) > 0 { + handle.printLog("sub net topics") + var subNetList bhome_msg.MsgTopicList + for _,v := range ri.SubNetTopic { + subNetList.TopicList = append(subNetList.TopicList, []byte(v)) + } + var subNetReply bhome_msg.MsgCommonReply + if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) { + handle.printLog("bhsgo.SubscribeNet ret false") + } + } + + if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 { + //鍚姩璁㈤槄淇℃伅鎺ユ敹 + handle.wg.Add(1) + go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) } return handle, nil @@ -218,9 +234,20 @@ //鑾峰彇topic瀵瑰簲鐨刱ey //濡傛灉浼犱簡serverId涓嶄负绌猴紝鍒欒幏鍙栨寚瀹氭満鍣ㄤ笂鐨則opic-key //濡傛灉server涓虹┖锛屽垯鑾峰彇鎵�鏈夎妭鐐逛笂topic-key -func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) { - - return nil, nil +func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { + dest := bhome_msg.BHAddress{} + reqTopic := bhome_msg.MsgQueryTopic{ + Topic: []byte(topic), + } + rep := bhome_msg.MsgQueryTopicReply{} + if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) { + return rep.NodeAddress, nil + } + if rep.Errmsg != nil { + h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString)) + return nil, errors.New(string(rep.Errmsg.ErrString)) + } + return nil, errors.New("bhsgo.QueryTopicAddress ret false") } func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { @@ -243,10 +270,12 @@ } } -func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) { +func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) { dest := bhome_msg.BHAddress{} if destArr != nil && len(destArr) > 0 { - dest = destArr[0] + if destArr[0].Addr != nil { + dest = *(destArr[0].Addr) + } } pid := "" r := bhome_msg.MsgRequestTopicReply{} -- Gitblit v1.8.0