package bhomeclient import ( "basic.com/valib/bhshmq.git/api/bhsgo" "basic.com/valib/bhshmq.git/proto/source/bhome_msg" "context" "encoding/json" "errors" "fmt" "os" "sync" "time" "unsafe" ) type MsgReq struct { ProcId string bhome_msg.MsgRequestTopic Src unsafe.Pointer } type BHBus struct { ctx context.Context ri *RegisterInfo conf *Config nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步 mtxNode sync.Mutex //访问节点主题表时,需要加锁 wg *sync.WaitGroup ChSub chan bhome_msg.MsgPublish ChReply chan MsgReq } //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) { var procId string var msg bhome_msg.MsgRequestTopic var src unsafe.Pointer for { select { case <-ctx.Done(): logFn("recvRoutine ctx.Done") wg.Done() return default: if bhsgo.ReadRequest(&procId, &msg, &src, 100) { ch <- MsgReq{ procId, msg, src, } logFn("ReadRequest topic:", string(msg.Topic), " data:", string(msg.Data)) procId = "" msg.Reset() src = unsafe.Pointer(nil) } else { time.Sleep(100 * time.Millisecond) } } } } //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { handle := &BHBus { ctx: ctx, conf: config, ri: ri, wg: &sync.WaitGroup{}, ChSub: make(chan bhome_msg.MsgPublish, config.chSize), ChReply: make(chan MsgReq, config.chSize), } //如果注册失败,就会一直尝试注册 procI := bhome_msg.ProcInfo{ ProcId: []byte(ri.Proc.ID), Name: []byte(ri.Proc.Name), } var regReply bhome_msg.MsgCommonReply loop: for { select { case <-q: handle.printLog("register <-q") return nil,errors.New("ctx is done") default: if bhsgo.Register(&procI, ®Reply, handle.conf.sendTimeOut) { break loop } else { time.Sleep(time.Second) } } } if ri.PubTopic != nil && len(ri.PubTopic) > 0 { topics := bhome_msg.MsgTopicList{} var regTopicReply bhome_msg.MsgCommonReply for _,t := range ri.PubTopic { topics.TopicList = append(topics.TopicList, []byte(t)) } loopRT: for { select { case <-q: handle.printLog("RegisterTopics recv quit signal") return nil, errors.New("RegisterTopics recv quit signal") default: if bhsgo.RegisterTopics(&topics, ®TopicReply, handle.conf.sendTimeOut) { handle.printLog("bhsgo.RegisterTopics success!!") break loopRT } else { time.Sleep(time.Second) } } } handle.wg.Add(1) go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog) } handle.printLog("register done!" ) handle.wg = &sync.WaitGroup{} //有订阅消息才需要启动协程接收消息 if len(ri.SubTopic) > 0 { handle.printLog("sub topics") var subList bhome_msg.MsgTopicList for _,v := range ri.SubTopic { subList.TopicList = append(subList.TopicList, []byte(v)) } var subReply bhome_msg.MsgCommonReply if !bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) { handle.printLog("bhsgo.Subscribe ret false") } } if len(ri.SubNetTopic) > 0 { handle.printLog("sub net topics") var subNetList bhome_msg.MsgTopicList for _,v := range ri.SubNetTopic { subNetList.TopicList = append(subNetList.TopicList, []byte(v)) } var subNetReply bhome_msg.MsgCommonReply if !bhsgo.SubscribeNet(&subNetList, &subNetReply, handle.conf.sendTimeOut) { handle.printLog("bhsgo.SubscribeNet ret false") } } if len(ri.SubTopic) > 0 || len(ri.SubNetTopic) > 0 { //启动订阅信息接收 handle.wg.Add(1) go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog) } return handle, nil } func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) { var procId string var msg bhome_msg.MsgPublish for { select { case <-ctx.Done(): logFn("recvRoutine ctx.Done") wg.Done() return default: if bhsgo.ReadSub(&procId, &msg, 100) { ch <- msg logFn("ReadSub topic:", string(msg.Topic), " data:", string(msg.Data)) procId = "" msg.Reset() } else { //time.Sleep(100 * time.Millisecond) } } } } //DeRegister func (h *BHBus) DeRegister(dri *RegisterInfo) error { h.printLog("DeRegister") return nil } func (h *BHBus) printLog(v ...interface{}) { if h.conf.fnLog != nil { h.conf.fnLog(v...) } } //Release func (h *BHBus) Free() { h.printLog("call BHBus free") h.wg.Wait() bhsgo.Cleanup() h.printLog("h.wg.Wait done") } //HeartBeat send func (h *BHBus) HeartBeat() error { procI := bhome_msg.ProcInfo{ ProcId: []byte(h.ri.Proc.ID), Name: []byte(h.ri.Proc.Name), } var ret bhome_msg.MsgCommonReply if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) { return nil } else { return errors.New("send heartBeat return false") } } //更新主题列表 func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { h.mtxNode.Lock() defer h.mtxNode.Unlock() h.nodes = arr } //获取topic对应的key //如果传了serverId不为空,则获取指定机器上的topic-key //如果server为空,则获取所有节点上topic-key func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) { dest := bhome_msg.BHAddress{} reqTopic := bhome_msg.MsgQueryTopic{ Topic: []byte(topic), } rep := bhome_msg.MsgQueryTopicReply{} if bhsgo.QueryTopicAddress(&dest, &reqTopic, &rep, h.conf.sendTimeOut) { return rep.NodeAddress, nil } if rep.Errmsg != nil { h.printLog("QueryTopicAddress errCode:", rep.Errmsg.ErrCode, "errMsg:", string(rep.Errmsg.ErrString)) return nil, errors.New(string(rep.Errmsg.ErrString)) } return nil, errors.New("bhsgo.QueryTopicAddress ret false") } func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) { //1.首先需要通过topic拿到本机对应的NetNode //2.将请求返送到对应的server,并等待返回值 pid := "" mrt := bhome_msg.MsgRequestTopicReply{} dest := bhome_msg.BHAddress{} if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) { var reply Reply if err := json.Unmarshal(mrt.Data, &reply); err != nil { return nil,err } return &reply, nil } else { i, s := bhsgo.GetLastError() h.printLog(" lastErr i:", i, " msg:", s, " topic:", string(req.Topic)) return nil, errors.New("request ") } } func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []*bhome_msg.MsgQueryTopicReply_BHNodeAddress) ([]byte, error) { dest := bhome_msg.BHAddress{} if destArr != nil && len(destArr) > 0 { if destArr[0].Addr != nil { dest = *(destArr[0].Addr) } } pid := "" r := bhome_msg.MsgRequestTopicReply{} if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) { return r.Data, nil } else { i, s := bhsgo.GetLastError() h.printLog("bhsgo.Request request lastErr i:", i, " msg:", s, " topic:", string(req.Topic), " dest:", dest) return nil, errors.New("bhsgo.Request return false") } } func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error { data,err := json.Marshal(*i) if err != nil { return err } rep := bhome_msg.MsgRequestTopicReply{ Data: data, } if bhsgo.SendReply(src, &rep) { return nil } return errors.New("reply return false") } func (h *BHBus) RequestCenter() ([]*bhome_msg.MsgQueryProcReply_Info, error) { dest := &bhome_msg.BHAddress{} topic := &bhome_msg.MsgQueryProc{} rep := &bhome_msg.MsgQueryProcReply{} if bhsgo.QueryProcs(dest, topic, rep, h.conf.sendTimeOut) { return rep.ProcList, nil } else { return nil, errors.New("QueryProcs ret flase") } } //向主题通道中发布消息 func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error { if bhsgo.Publish(msg, h.conf.pubTimeOut) { return nil } else { return fmt.Errorf("pub err ") } } func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int { if bhsgo.Publish(msg, timeout) { return 1 } return -1 } //追加订阅的主题消息 func (h *BHBus) Sub(topics []string) { if topics != nil && len(topics) >0 { var subList bhome_msg.MsgTopicList for _, v := range topics { subList.TopicList = append(subList.TopicList, []byte(v)) } var subReply bhome_msg.MsgCommonReply if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) { h.printLog("sub topics") } } } //注销订阅的主题 func (h *BHBus) DeSub(topics []string) { if topics != nil { } }