| | |
| | | src, |
| | | } |
| | | |
| | | logFn("ReadRequest topic:", string(msg.Topic), " data:", string(msg.Data)) |
| | | procId = "" |
| | | msg.Reset() |
| | | src = unsafe.Pointer(nil) |
| | |
| | | ctx: ctx, |
| | | conf: config, |
| | | ri: ri, |
| | | wg: &sync.WaitGroup{}, |
| | | ChSub: make(chan bhome_msg.MsgPublish, config.chSize), |
| | | ChReply: make(chan MsgReq, config.chSize), |
| | | } |
| | |
| | | handle.wg = &sync.WaitGroup{} |
| | | |
| | | //有订阅消息才需要启动协程接收消息 |
| | | if ri.SubTopic != nil && len(ri.SubTopic) > 0 { |
| | | if len(ri.SubTopic) > 0 { |
| | | handle.printLog("sub topics") |
| | | var subList bhome_msg.MsgTopicList |
| | | for _,v := range ri.SubTopic { |
| | |
| | | } |
| | | |
| | | var subReply bhome_msg.MsgCommonReply |
| | | if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { |
| | | //启动订阅信息接收 |
| | | handle.wg.Add(1) |
| | | go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) |
| | | if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { |
| | | handle.printLog("bhsgo.Subscribe ret false") |
| | | } |
| | | } |
| | | |
| | | if len(ri.SubNetTopic) > 0 { |
| | | handle.printLog("sub net topics") |
| | | var subNetList bhome_msg.MsgTopicList |
| | | for _,v := range ri.SubNetTopic { |
| | | subNetList.TopicList = append(subNetList.TopicList, []byte(v)) |
| | | } |
| | | var subNetReply bhome_msg.MsgCommonReply |
| | | if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) { |
| | | handle.printLog("bhsgo.SubscribeNet ret false") |
| | | } |
| | | } |
| | | |
| | | if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 { |
| | | //启动订阅信息接收 |
| | | handle.wg.Add(1) |
| | | go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) |
| | | } |
| | | |
| | | return handle, nil |
| | |
| | | default: |
| | | if bhsgo.ReadSub(&procId, &msg, 100) { |
| | | ch <- msg |
| | | logFn("ReadSub topic:", string(msg.Topic), " data:", string(msg.Data)) |
| | | |
| | | procId = "" |
| | | msg.Reset() |
| | |
| | | func (h *BHBus) Free() { |
| | | h.printLog("call BHBus free") |
| | | h.wg.Wait() |
| | | bhsgo.Cleanup() |
| | | h.printLog("h.wg.Wait done") |
| | | } |
| | | |
| | |
| | | //获取topic对应的key |
| | | //如果传了serverId不为空,则获取指定机器上的topic-key |
| | | //如果server为空,则获取所有节点上topic-key |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) { |
| | | |
| | | return nil, nil |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { |
| | | dest := bhome_msg.BHAddress{} |
| | | reqTopic := bhome_msg.MsgQueryTopic{ |
| | | Topic: []byte(topic), |
| | | } |
| | | rep := bhome_msg.MsgQueryTopicReply{} |
| | | if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) { |
| | | return rep.NodeAddress, nil |
| | | } |
| | | if rep.Errmsg != nil { |
| | | h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString)) |
| | | return nil, errors.New(string(rep.Errmsg.ErrString)) |
| | | } |
| | | return nil, errors.New("bhsgo.QueryTopicAddress ret false") |
| | | } |
| | | |
| | | func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { |
| | |
| | | |
| | | return &reply, nil |
| | | } else { |
| | | i, s := bhsgo.GetLastError() |
| | | h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic)) |
| | | return nil, errors.New("request ") |
| | | } |
| | | } |
| | | |
| | | func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) { |
| | | func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) { |
| | | dest := bhome_msg.BHAddress{} |
| | | if destArr != nil && len(destArr) > 0 { |
| | | dest = destArr[0] |
| | | if destArr[0].Addr != nil { |
| | | dest = *(destArr[0].Addr) |
| | | } |
| | | } |
| | | pid := "" |
| | | r := bhome_msg.MsgRequestTopicReply{} |
| | | if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) { |
| | | return r.Data, nil |
| | | } else { |
| | | h.printLog("bhsgo.Request request err:", r.Errmsg) |
| | | i, s := bhsgo.GetLastError() |
| | | h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest) |
| | | return nil, errors.New("bhsgo.Request return false") |
| | | } |
| | | } |
| | |
| | | return errors.New("reply return false") |
| | | } |
| | | |
| | | func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) { |
| | | |
| | | return nil, errors.New("") |
| | | func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) { |
| | | dest := &bhome_msg.BHAddress{} |
| | | topic := &bhome_msg.MsgQueryProc{} |
| | | rep := &bhome_msg.MsgQueryProcReply{} |
| | | if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) { |
| | | return rep.ProcList, nil |
| | | } else { |
| | | return nil, errors.New("QueryProcs ret flase") |
| | | } |
| | | } |
| | | |
| | | |