| | |
| | | package bhomeclient |
| | | |
| | | import ( |
| | | "basic.com/valib/bhomebus.git" |
| | | "basic.com/valib/bhshmq.git/api/bhsgo" |
| | | "basic.com/valib/bhshmq.git/proto/source/bhome_msg" |
| | | "context" |
| | | "encoding/json" |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | "strconv" |
| | | "sync" |
| | | "time" |
| | | "unsafe" |
| | | ) |
| | | |
| | | type sockServer struct { |
| | | sock *bhomebus.Socket |
| | | info *ProcInfo |
| | | } |
| | | |
| | | type sockClient struct { |
| | | sock *bhomebus.Socket |
| | | peer int |
| | | } |
| | | |
| | | type TransInfo struct { |
| | | info *MsgInfo |
| | | port int |
| | | type MsgReq struct { |
| | | ProcId string |
| | | bhome_msg.MsgRequestTopic |
| | | Src unsafe.Pointer |
| | | } |
| | | |
| | | type BHBus struct { |
| | | ctx context.Context |
| | | ctx context.Context |
| | | |
| | | conf *Config |
| | | ri *RegisterInfo |
| | | |
| | | nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步 |
| | | mtxNode sync.Mutex //访问节点主题表时,需要加锁 |
| | | conf *Config |
| | | |
| | | m map[string]*sockServer |
| | | nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步 |
| | | mtxNode sync.Mutex //访问节点主题表时,需要加锁 |
| | | |
| | | wg *sync.WaitGroup |
| | | wg *sync.WaitGroup |
| | | |
| | | sockRep *sockServer //响应其他进程request的socket,server |
| | | |
| | | sockHB *sockClient //维持心跳的socket,线程实时发送,需要单独处理 |
| | | |
| | | sockPub *sockClient //发布主题的socket,需要单独socket处理 |
| | | |
| | | sockSub *sockClient //订阅主题的socket,线程实时接收消息,需要单独处理 |
| | | |
| | | sockWorker *sockClient //发给任意的server,短暂的request client |
| | | mtxWorker sync.Mutex //SendAndRecv可能不是线程安全的 |
| | | |
| | | chSub chan TransInfo |
| | | chReply chan TransInfo |
| | | ChSub chan bhome_msg.MsgPublish |
| | | ChReply chan MsgReq |
| | | } |
| | | |
| | | //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 |
| | | func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) { |
| | | var data []byte |
| | | var key int |
| | | func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) { |
| | | var procId string |
| | | var msg bhome_msg.MsgRequestTopic |
| | | var src unsafe.Pointer |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | |
| | | wg.Done() |
| | | return |
| | | default: |
| | | n := s.RecvfromTimeout(&data, &key, 1000) |
| | | logFn("recvRoutine Recvfrom n:", n) |
| | | if n == 0 { |
| | | var info MsgInfo |
| | | if err := json.Unmarshal(data, &info);err == nil { |
| | | ch <- TransInfo{ |
| | | info: &info, |
| | | port: key, //这个key在发布订阅模式中是bus的key,是个固定值,上层用不到 |
| | | } |
| | | |
| | | data = []byte{} |
| | | key = 0 |
| | | if bhsgo.ReadRequest(&procId, &msg, &src, 100) { |
| | | ch <- MsgReq{ |
| | | procId, |
| | | msg, |
| | | src, |
| | | } |
| | | |
| | | logFn("ReadRequest topic:", string(msg.Topic), " data:", string(msg.Data)) |
| | | procId = "" |
| | | msg.Reset() |
| | | src = unsafe.Pointer(nil) |
| | | } else { |
| | | time.Sleep(10 * time.Millisecond) |
| | | time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | //Register |
| | | func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { |
| | | handle := &BHBus{ |
| | | conf: config, |
| | | m: make(map[string]*sockServer), |
| | | chSub: make(chan TransInfo, config.chSize), |
| | | chReply: make(chan TransInfo, config.chSize), |
| | | handle := &BHBus { |
| | | ctx: ctx, |
| | | conf: config, |
| | | ri: ri, |
| | | wg: &sync.WaitGroup{}, |
| | | ChSub: make(chan bhome_msg.MsgPublish, config.chSize), |
| | | ChReply: make(chan MsgReq, config.chSize), |
| | | } |
| | | |
| | | var err error |
| | | err = bhomebus.Init("libshm_queue.so") |
| | | if err != nil { |
| | | handle.printLog("Init so err:", err) |
| | | return nil, err |
| | | } |
| | | err = bhomebus.ShmInit(512) |
| | | if err != nil { |
| | | handle.printLog("shmInit size err:", err) |
| | | return nil, err |
| | | } |
| | | regSock := bhomebus.OpenSocket() |
| | | if regSock == nil { |
| | | handle.printLog("Open Socket ret Nil") |
| | | return nil, errors.New("OpenSocket ret Nil") |
| | | } |
| | | defer func() { |
| | | regSock.Close() |
| | | handle.printLog("regSock.CLose") |
| | | }() |
| | | |
| | | var msg []byte |
| | | var regAddr []bhomebus.NetNode |
| | | var regR *RegisterReply //注册结果信息 |
| | | //如果注册失败,就会一直尝试注册 |
| | | procI := bhome_msg.ProcInfo{ |
| | | ProcId: []byte(ri.Proc.ID), |
| | | Name: []byte(ri.Proc.Name), |
| | | } |
| | | var regReply bhome_msg.MsgCommonReply |
| | | loop: |
| | | for { |
| | | select { |
| | |
| | | handle.printLog("register <-q") |
| | | return nil,errors.New("ctx is done") |
| | | default: |
| | | if msg == nil { |
| | | rid, err := json.Marshal(*ri) |
| | | if err != nil { |
| | | handle.printLog("marshal registerInfo err:", err) |
| | | return nil, errors.New("marshal registerInfo err:"+err.Error()) |
| | | } |
| | | s := MsgInfo{ |
| | | SrcProc: ri.Proc, |
| | | MsgType: MesgType_ReqRep, |
| | | Topic: TOPIC_REGISTER, |
| | | Body: rid, |
| | | } |
| | | handle.printLog("register MsgInfo:", s) |
| | | dRegData,err := json.Marshal(s) |
| | | if err != nil { |
| | | handle.printLog("marshal deregister msg err:", err) |
| | | return nil, err |
| | | } |
| | | msg = dRegData |
| | | } |
| | | if regAddr == nil { |
| | | regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: handle.conf.regKey, |
| | | }) |
| | | } |
| | | |
| | | var rMsg []bhomebus.Mesg |
| | | n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n代表成功发送的节点的个数 |
| | | handle.printLog("regSock.Sendandrecv n:", n, "len(rMsg):", len(rMsg)) |
| | | if n == 1 && len(rMsg) == 1 { |
| | | var cr CommonReply |
| | | if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil { |
| | | handle.printLog("unmarshal regReply err:", err) |
| | | return nil, errors.New("unmarshal regReply err:"+err.Error()) |
| | | } else { |
| | | if cr.Status == REPLY_SUCCESS { |
| | | var rr RegisterReply |
| | | if err = json.Unmarshal(cr.Body, &rr);err ==nil { |
| | | regR = &rr |
| | | break loop |
| | | } else { |
| | | handle.printLog("unmarshal RegisterReply err:", err) |
| | | } |
| | | |
| | | } else { |
| | | handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc) |
| | | } |
| | | |
| | | } |
| | | if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) { |
| | | break loop |
| | | } else { |
| | | time.Sleep(100 * time.Millisecond) |
| | | time.Sleep(time.Second) |
| | | } |
| | | } |
| | | } |
| | | |
| | | handle.printLog("register Reply:", *regR) |
| | | |
| | | for _, v := range ri.Channel { |
| | | if k,ok := regR.ChannelKey[v];ok { |
| | | s := bhomebus.OpenSocket() |
| | | s.ForceBind(int(k)) |
| | | handle.m[v] = &sockServer{ |
| | | sock: s, |
| | | info: &ri.Proc, |
| | | if ri.PubTopic != nil && len(ri.PubTopic) > 0 { |
| | | topics := bhome_msg.MsgTopicList{} |
| | | var regTopicReply bhome_msg.MsgCommonReply |
| | | for _,t := range ri.PubTopic { |
| | | topics.TopicList = append(topics.TopicList, []byte(t)) |
| | | } |
| | | loopRT: |
| | | for { |
| | | select { |
| | | case <-q: |
| | | handle.printLog("RegisterTopics recv quit signal") |
| | | return nil, errors.New("RegisterTopics recv quit signal") |
| | | default: |
| | | if bhsgo.RegisterTopics(&topics, ®TopicReply, handle.conf.sendTimeOut) { |
| | | handle.printLog("bhsgo.RegisterTopics success!!") |
| | | break loopRT |
| | | } else { |
| | | time.Sleep(time.Second) |
| | | } |
| | | } |
| | | } |
| | | |
| | | handle.wg.Add(1) |
| | | go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog) |
| | | } |
| | | |
| | | //维持心跳的socket |
| | | sockHB := bhomebus.OpenSocket() |
| | | handle.sockHB = &sockClient{ |
| | | sock: sockHB, |
| | | peer: int(regR.HeartbeatKey), |
| | | } |
| | | handle.printLog("register done!" ) |
| | | |
| | | handle.wg = &sync.WaitGroup{} |
| | | |
| | | if ri.PubTopic != nil && len(ri.PubTopic) > 0 { |
| | | sockReply := bhomebus.OpenSocket() |
| | | sockReply.ForceBind(int(regR.ReplyKey)) |
| | | handle.wg.Add(1) |
| | | //serve server reply |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | handle.sockRep = &sockServer{ |
| | | sock: sockReply, |
| | | info: &ri.Proc, |
| | | } |
| | | } |
| | | |
| | | |
| | | //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key |
| | | sockPub := bhomebus.OpenSocket() |
| | | handle.sockPub = &sockClient{ |
| | | sock: sockPub, |
| | | peer: -1, |
| | | } |
| | | |
| | | //有订阅消息才需要启动协程接收消息 |
| | | if ri.SubTopic != nil && len(ri.SubTopic) > 0 { |
| | | //订阅消息的socket |
| | | sockSub := bhomebus.OpenSocket() |
| | | //订阅所有主题 |
| | | if len(ri.SubTopic) > 0 { |
| | | handle.printLog("sub topics") |
| | | var subList bhome_msg.MsgTopicList |
| | | for _,v := range ri.SubTopic { |
| | | sockSub.Sub(v) |
| | | subList.TopicList = append(subList.TopicList, []byte(v)) |
| | | } |
| | | |
| | | var subReply bhome_msg.MsgCommonReply |
| | | if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { |
| | | handle.printLog("bhsgo.Subscribe ret false") |
| | | } |
| | | } |
| | | |
| | | if len(ri.SubNetTopic) > 0 { |
| | | handle.printLog("sub net topics") |
| | | var subNetList bhome_msg.MsgTopicList |
| | | for _,v := range ri.SubNetTopic { |
| | | subNetList.TopicList = append(subNetList.TopicList, []byte(v)) |
| | | } |
| | | var subNetReply bhome_msg.MsgCommonReply |
| | | if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) { |
| | | handle.printLog("bhsgo.SubscribeNet ret false") |
| | | } |
| | | } |
| | | |
| | | if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 { |
| | | //启动订阅信息接收 |
| | | handle.wg.Add(1) |
| | | go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) |
| | | handle.sockSub = &sockClient{ |
| | | sock: sockSub, |
| | | peer: -1, |
| | | } |
| | | } |
| | | |
| | | sockWorker := bhomebus.OpenSocket() |
| | | handle.sockWorker = &sockClient{ |
| | | sock: sockWorker, |
| | | peer: int(regR.QueryTopicKey), |
| | | go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) |
| | | } |
| | | |
| | | return handle, nil |
| | | } |
| | | |
| | | func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) { |
| | | var procId string |
| | | var msg bhome_msg.MsgPublish |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logFn("recvRoutine ctx.Done") |
| | | wg.Done() |
| | | return |
| | | default: |
| | | if bhsgo.ReadSub(&procId, &msg, 100) { |
| | | ch <- msg |
| | | logFn("ReadSub topic:", string(msg.Topic), " data:", string(msg.Data)) |
| | | |
| | | procId = "" |
| | | msg.Reset() |
| | | } else { |
| | | //time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | //DeRegister |
| | | func (h *BHBus) DeRegister(dri *RegisterInfo) error { |
| | | data, err := json.Marshal(*dri) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | dRegData,err := json.Marshal(MsgInfo{ |
| | | MsgType: "", |
| | | Topic: TOPIC_DEREGISTER, |
| | | Body: data, |
| | | }) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: h.conf.regKey, |
| | | }) |
| | | var retMsg []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut) |
| | | if n == 0 { |
| | | return nil |
| | | } |
| | | h.printLog("DeRegister retMsg:", retMsg) |
| | | return fmt.Errorf("DeRegister n:%d", n) |
| | | h.printLog("DeRegister") |
| | | return nil |
| | | } |
| | | |
| | | func (h *BHBus) printLog(v ...interface{}) { |
| | |
| | | func (h *BHBus) Free() { |
| | | h.printLog("call BHBus free") |
| | | h.wg.Wait() |
| | | bhsgo.Cleanup() |
| | | h.printLog("h.wg.Wait done") |
| | | for _,v := range h.m { |
| | | v.sock.Close() |
| | | } |
| | | if h.sockRep != nil { |
| | | h.sockRep.sock.Close() |
| | | h.sockRep = nil |
| | | } |
| | | if h.sockHB != nil { |
| | | h.sockHB.sock.Close() |
| | | h.sockHB = nil |
| | | } |
| | | if h.sockPub != nil { |
| | | h.sockPub.sock.Close() |
| | | h.sockPub = nil |
| | | } |
| | | if h.sockSub != nil { |
| | | h.sockSub.sock.Close() |
| | | h.sockSub = nil |
| | | } |
| | | if h.sockWorker != nil { |
| | | h.sockWorker.sock.Close() |
| | | h.sockWorker = nil |
| | | } |
| | | |
| | | h.printLog("BHBus Freed") |
| | | } |
| | | |
| | | |
| | | //HeartBeat send |
| | | func (h *BHBus) HeartBeat(info *HeartBeatInfo) error { |
| | | data, err := json.Marshal(*info) |
| | | if err == nil { |
| | | hbd,err := json.Marshal(MsgInfo{ |
| | | SrcProc: info.Proc, |
| | | MsgType: MesgType_ReqRep, |
| | | Topic: TOPIC_HEARTBEAT, |
| | | Body: data, |
| | | }) |
| | | if err != nil { |
| | | h.printLog("marshal heartbeat msgInfo err:", err) |
| | | return err |
| | | } |
| | | var rMsg []bhomebus.Mesg |
| | | hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: h.sockHB.peer, |
| | | }) |
| | | //h.printLog("start send heartbeat") |
| | | n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n代表成功发送的节点的个数 |
| | | //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg)) |
| | | |
| | | if n > 0 { |
| | | return nil |
| | | } else { |
| | | return fmt.Errorf("sockHB Sendandrecv ret n:%d", n) |
| | | } |
| | | func (h *BHBus) HeartBeat() error { |
| | | procI := bhome_msg.ProcInfo{ |
| | | ProcId: []byte(h.ri.Proc.ID), |
| | | Name: []byte(h.ri.Proc.Name), |
| | | } |
| | | return err |
| | | var ret bhome_msg.MsgCommonReply |
| | | if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) { |
| | | return nil |
| | | } else { |
| | | return errors.New("send heartBeat return false") |
| | | } |
| | | } |
| | | |
| | | //func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error { |
| | | // n := s.sock.SendtoTimeout(data, s.peer, timeout) |
| | | // if n == 0 { |
| | | // return nil |
| | | // } |
| | | // return errors.New("SendtoTimeout n:"+strconv.Itoa(n)) |
| | | //} |
| | | |
| | | |
| | | //更新主题列表 |
| | | func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { |
| | | func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { |
| | | h.mtxNode.Lock() |
| | | defer h.mtxNode.Unlock() |
| | | h.nodes = arr |
| | |
| | | //获取topic对应的key |
| | | //如果传了serverId不为空,则获取指定机器上的topic-key |
| | | //如果server为空,则获取所有节点上topic-key |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) { |
| | | h.mtxNode.Lock() |
| | | defer h.mtxNode.Unlock() |
| | | var nodes []bhomebus.NetNode |
| | | reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: h.sockWorker.peer, |
| | | }) |
| | | reqD,err := json.Marshal(MsgInfo{ |
| | | SrcProc: *srcProc, |
| | | MsgType: MesgType_ReqRep, |
| | | Topic: TOPIC_QUERYKEY, |
| | | Body: []byte(topic), |
| | | }) |
| | | if err != nil { |
| | | return nil, fmt.Errorf("marshal req err:%s", err.Error()) |
| | | func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { |
| | | dest := bhome_msg.BHAddress{} |
| | | reqTopic := bhome_msg.MsgQueryTopic{ |
| | | Topic: []byte(topic), |
| | | } |
| | | var ret []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) |
| | | if n > 0 { |
| | | var reply CommonReply |
| | | err = json.Unmarshal(ret[0].Data, &reply) |
| | | if err != nil { |
| | | h.printLog("unmarshal err:", err) |
| | | return nil, err |
| | | } |
| | | |
| | | if reply.Status == REPLY_SUCCESS { |
| | | err = json.Unmarshal(reply.Body, &nodes) |
| | | if err == nil { |
| | | return nodes, nil |
| | | } else { |
| | | h.printLog("unmarshal err:", err, "nodes:", nodes) |
| | | return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error()) |
| | | } |
| | | } else { |
| | | h.printLog("reply status:", reply.Status, "desc:", reply.Desc, "body:", string(reply.Body)) |
| | | return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status) |
| | | } |
| | | } else { |
| | | return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n) |
| | | rep := bhome_msg.MsgQueryTopicReply{} |
| | | if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) { |
| | | return rep.NodeAddress, nil |
| | | } |
| | | if rep.Errmsg != nil { |
| | | h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString)) |
| | | return nil, errors.New(string(rep.Errmsg.ErrString)) |
| | | } |
| | | return nil, errors.New("bhsgo.QueryTopicAddress ret false") |
| | | } |
| | | |
| | | func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) { |
| | | func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { |
| | | //1.首先需要通过topic拿到本机对应的NetNode |
| | | rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) |
| | | h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //2.将请求返送到对应的server,并等待返回值 |
| | | data, err := json.Marshal(*req) |
| | | h.printLog("marshal(*req) err:", err) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | var ret []bhomebus.Mesg |
| | | |
| | | n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs) |
| | | h.printLog("Request n: ", n, " len(ret): ", len(ret)) |
| | | if n > 0 && len(ret) > 0 { |
| | | if err = json.Unmarshal(ret[0].Data, resp); err == nil { |
| | | return resp, nil |
| | | } else { |
| | | h.printLog("unmarshal ret[0].Data err:", err) |
| | | return nil, err |
| | | pid := "" |
| | | mrt := bhome_msg.MsgRequestTopicReply{} |
| | | dest := bhome_msg.BHAddress{} |
| | | if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) { |
| | | var reply Reply |
| | | if err := json.Unmarshal(mrt.Data, &reply); err != nil { |
| | | return nil,err |
| | | } |
| | | |
| | | return &reply, nil |
| | | } else { |
| | | i, s := bhsgo.GetLastError() |
| | | h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic)) |
| | | return nil, errors.New("request ") |
| | | } |
| | | return nil, fmt.Errorf("request err") |
| | | } |
| | | |
| | | func (h *BHBus) Reply(replyKey int, i MsgInfo) error { |
| | | data,err := json.Marshal(i) |
| | | func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) { |
| | | dest := bhome_msg.BHAddress{} |
| | | if destArr != nil && len(destArr) > 0 { |
| | | if destArr[0].Addr != nil { |
| | | dest = *(destArr[0].Addr) |
| | | } |
| | | } |
| | | pid := "" |
| | | r := bhome_msg.MsgRequestTopicReply{} |
| | | if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) { |
| | | return r.Data, nil |
| | | } else { |
| | | i, s := bhsgo.GetLastError() |
| | | h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest) |
| | | return nil, errors.New("bhsgo.Request return false") |
| | | } |
| | | } |
| | | |
| | | func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error { |
| | | data,err := json.Marshal(*i) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) |
| | | h.printLog("reply to key:", replyKey, " n:",n) |
| | | if n != 0 { |
| | | return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) |
| | | rep := bhome_msg.MsgRequestTopicReply{ |
| | | Data: data, |
| | | } |
| | | return nil |
| | | if bhsgo.SendReply(src, &rep) { |
| | | return nil |
| | | } |
| | | |
| | | return errors.New("reply return false") |
| | | } |
| | | |
| | | |
| | | //只发送请求,不需要应答. |
| | | //暴露在上层的,只有topic,没有key。 |
| | | func (h *BHBus) SendOnly(key int, arg *MsgInfo) error { |
| | | data,err := json.Marshal(*arg) |
| | | if err != nil { |
| | | return err |
| | | func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) { |
| | | dest := &bhome_msg.BHAddress{} |
| | | topic := &bhome_msg.MsgQueryProc{} |
| | | rep := &bhome_msg.MsgQueryProcReply{} |
| | | if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) { |
| | | return rep.ProcList, nil |
| | | } else { |
| | | return nil, errors.New("QueryProcs ret flase") |
| | | } |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut) |
| | | if n != 0 { |
| | | return fmt.Errorf("sendOnly ret n:%d", n) |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) { |
| | | data, err := json.Marshal(*req) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: KEY_QUERY, |
| | | }) |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | var ret []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut) |
| | | h.printLog("requestCenter n:", n, "len(ret):", len(ret)) |
| | | if n > 0 && len(ret) > 0{ |
| | | var cr CommonReply |
| | | if err = json.Unmarshal(ret[0].Data, &cr); err == nil { |
| | | return &cr, nil |
| | | } else { |
| | | h.printLog("unmarshal to CommonReply err:", err) |
| | | } |
| | | } |
| | | return nil, fmt.Errorf("request center err") |
| | | } |
| | | |
| | | |
| | | //向主题通道中发布消息 |
| | | func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error { |
| | | data,err := json.Marshal(*msg) |
| | | if err == nil { |
| | | if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n == 0 { |
| | | return nil |
| | | } else { |
| | | return fmt.Errorf("pub err n:%d", n) |
| | | } |
| | | func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error { |
| | | if bhsgo.Publish(msg, h.conf.pubTimeOut) { |
| | | return nil |
| | | } else { |
| | | return fmt.Errorf("pub err ") |
| | | } |
| | | } |
| | | |
| | | return err |
| | | func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int { |
| | | if bhsgo.Publish(msg, timeout) { |
| | | return 1 |
| | | } |
| | | return -1 |
| | | } |
| | | |
| | | //追加订阅的主题消息 |
| | | func (h *BHBus) Sub(topics []string) { |
| | | if topics != nil { |
| | | for _,t := range topics { |
| | | h.sockSub.sock.Sub(t) |
| | | if topics != nil && len(topics) >0 { |
| | | var subList bhome_msg.MsgTopicList |
| | | for _, v := range topics { |
| | | subList.TopicList = append(subList.TopicList, []byte(v)) |
| | | } |
| | | |
| | | var subReply bhome_msg.MsgCommonReply |
| | | if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) { |
| | | h.printLog("sub topics") |
| | | } |
| | | } |
| | | } |
| | |
| | | //注销订阅的主题 |
| | | func (h *BHBus) DeSub(topics []string) { |
| | | if topics != nil { |
| | | for _,t := range topics { |
| | | if n := h.sockSub.sock.Desub(t); n != 0 { |
| | | h.printLog("DeSub topic:", t, " n:", n) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | //获取sub 或者需要reply的消息 |
| | | func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { |
| | | if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { |
| | | return nil,nil, -1 |
| | | } |
| | | if len(h.chSub) >1 { |
| | | m := <-h.chSub |
| | | subMsg = m.info |
| | | } |
| | | if len(h.chReply) > 1 { |
| | | m := <-h.chReply |
| | | replyMsg = m.info |
| | | replyKey = m.port |
| | | } |
| | | return |
| | | } |