package bhomeclient import ( "basic.com/valib/bhomebus.git" "context" "encoding/json" "errors" "fmt" "os" "strconv" "sync" "time" ) type sockServer struct { sock *bhomebus.Socket info *ProcInfo } type sockClient struct { sock *bhomebus.Socket peer int } type TransInfo struct { info *MsgInfo port int } type BHBus struct { ctx context.Context conf *Config nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步 mtxNode sync.Mutex //访问节点主题表时,需要加锁 m map[string]*sockServer wg *sync.WaitGroup sockRep *sockServer //响应其他进程request的socket,server sockHB *sockClient //维持心跳的socket,线程实时发送,需要单独处理 sockPub *sockClient //发布主题的socket,需要单独socket处理 sockSub *sockClient //订阅主题的socket,线程实时接收消息,需要单独处理 sockWorker *sockClient //发给任意的server,短暂的request client //mtxWorker sync.Mutex //SendAndRecv可能不是线程安全的 chSub chan TransInfo chReply chan TransInfo } //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) { var data []byte var key int for { select { case <-ctx.Done(): logFn("recvRoutine ctx.Done") wg.Done() return default: n := s.RecvfromTimeout(&data, &key, 1000) //目前10001返回值表示超时 if n == 0 { var info MsgInfo if err := json.Unmarshal(data, &info);err == nil { ch <- TransInfo{ info: &info, port: key, //这个key在发布订阅模式中是bus的key,是个固定值,上层用不到 } data = []byte{} key = 0 } else { logFn("unmarshal to MsgInfo err:", err) } } else { time.Sleep(100 * time.Millisecond) } } } } //func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { // for { // select { // case <-ctx.Done(): // logFn("recvandsendRoutine ctx.Done") // wg.Done() // return // default: // n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时 // if n != 0 { // logFn("RecvandsendTimeout success") // } else { // //time.Sleep(100 * time.Millisecond) // } // } // } //} //Register func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { handle := &BHBus { ctx: ctx, conf: config, m: make(map[string]*sockServer), chSub: make(chan TransInfo, config.chSize), chReply: make(chan TransInfo, config.chSize), } var err error err = bhomebus.Init("libshm_queue.so") if err != nil { handle.printLog("Init so err:", err) return nil, err } err = bhomebus.ShmInit(512) if err != nil { handle.printLog("shmInit size err:", err) return nil, err } regSock := bhomebus.OpenSocket() if regSock == nil { handle.printLog("Open Socket ret Nil") return nil, errors.New("OpenSocket ret Nil") } defer func() { regSock.Close() handle.printLog("regSock.CLose") }() var msg []byte var regAddr []bhomebus.NetNode var regR *RegisterReply //注册结果信息 //如果注册失败,就会一直尝试注册 loop: for { select { case <-q: handle.printLog("register <-q") return nil,errors.New("ctx is done") default: if msg == nil { rid, err := json.Marshal(*ri) if err != nil { handle.printLog("marshal registerInfo err:", err) return nil, errors.New("marshal registerInfo err:"+err.Error()) } s := MsgInfo{ SrcProc: ri.Proc, MsgType: MesgType_ReqRep, Topic: TOPIC_REGISTER, Body: rid, } handle.printLog("register MsgInfo:", s) dRegData,err := json.Marshal(s) if err != nil { handle.printLog("marshal deregister msg err:", err) return nil, err } handle.printLog(string(dRegData)) msg = dRegData } if regAddr == nil { regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{ Key: handle.conf.regKey, }) } var rMsg []bhomebus.Mesg n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n代表成功发送的节点的个数 handle.printLog("regSock.SendandrecvTimeout n:", n) if n == 1 && len(rMsg) == 1 { var cr Reply if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil { handle.printLog("unmarshal regReply err:", err) return nil, errors.New("unmarshal regReply err:"+err.Error()) } else { if cr.Success { if rpd,err := json.Marshal(cr.Data);err ==nil { var rr RegisterReply if err = json.Unmarshal(rpd, &rr); err == nil { regR = &rr break loop } else { handle.printLog("unmarshal RegisterReply err:", err) } } else { handle.printLog("marshal cr.Data err:", err) } } else { handle.printLog("cr:", cr) } } } else { time.Sleep(1 * time.Second) } } } handle.printLog("register Reply:", *regR) for _, v := range ri.Channel { if k,ok := regR.ChannelKey[v];ok { s := bhomebus.OpenSocket() s.ForceBind(int(k)) handle.m[v] = &sockServer{ sock: s, info: &ri.Proc, } } } //维持心跳的socket sockHB := bhomebus.OpenSocket() handle.printLog("open sockHB") handle.sockHB = &sockClient{ sock: sockHB, peer: int(regR.HeartbeatKey), } handle.wg = &sync.WaitGroup{} if ri.PubTopic != nil && len(ri.PubTopic) > 0 { sockReply := bhomebus.OpenSocket() sockReply.ForceBind(int(regR.ReplyKey)) handle.printLog("after pubTopic forceBind") handle.wg.Add(1) //serve server reply go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) handle.sockRep = &sockServer{ sock: sockReply, info: &ri.Proc, } } //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key sockPub := bhomebus.OpenSocket() handle.sockPub = &sockClient{ sock: sockPub, peer: -1, } //有订阅消息才需要启动协程接收消息 if ri.SubTopic != nil && len(ri.SubTopic) > 0 { //订阅消息的socket sockSub := bhomebus.OpenSocket() //订阅所有主题 handle.printLog("start Sub topics") for _,v := range ri.SubTopic { subN := sockSub.Sub(v) handle.printLog("subTopic:", v, " ret n:", subN) } //启动订阅信息接收 handle.wg.Add(1) go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog) handle.sockSub = &sockClient{ sock: sockSub, peer: -1, } } sockWorker := bhomebus.OpenSocket() handle.sockWorker = &sockClient{ sock: sockWorker, peer: int(regR.QueryTopicKey), } return handle, nil } //DeRegister func (h *BHBus) DeRegister(dri *RegisterInfo) error { data, err := json.Marshal(*dri) if err != nil { return err } dRegData,err := json.Marshal(MsgInfo{ MsgType: "", Topic: TOPIC_DEREGISTER, Body: data, }) if err != nil { return err } //h.mtxWorker.Lock() //defer h.mtxWorker.Unlock() netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ Key: h.conf.regKey, }) var retMsg []bhomebus.Mesg n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut) if n == 0 { return nil } h.printLog("DeRegister retMsg:", retMsg) return fmt.Errorf("DeRegister n:%d", n) } func (h *BHBus) printLog(v ...interface{}) { if h.conf.fnLog != nil { h.conf.fnLog(v...) } } //Release func (h *BHBus) Free() { h.printLog("call BHBus free") h.wg.Wait() h.printLog("h.wg.Wait done") for _,v := range h.m { v.sock.Close() } if h.sockRep != nil { h.sockRep.sock.Close() h.sockRep = nil } if h.sockHB != nil { h.sockHB.sock.Close() h.sockHB = nil } if h.sockPub != nil { h.sockPub.sock.Close() h.sockPub = nil } if h.sockSub != nil { h.sockSub.sock.Close() h.sockSub = nil } if h.sockWorker != nil { h.sockWorker.sock.Close() h.sockWorker = nil } h.printLog("BHBus Freed") } //HeartBeat send func (h *BHBus) HeartBeat(info *HeartBeatInfo) error { data, err := json.Marshal(*info) if err == nil { hbd,err := json.Marshal(MsgInfo{ SrcProc: info.Proc, MsgType: MesgType_ReqRep, Topic: TOPIC_HEARTBEAT, Body: data, }) if err != nil { h.printLog("marshal heartbeat msgInfo err:", err) return err } var rMsg []bhomebus.Mesg hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{ Key: h.sockHB.peer, }) //h.printLog("start send heartbeat") n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n代表成功发送的节点的个数 //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg)) if n > 0 { return nil } else { return fmt.Errorf("sockHB Sendandrecv ret n:%d", n) } } return err } //func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error { // n := s.sock.SendtoTimeout(data, s.peer, timeout) // if n == 0 { // return nil // } // return errors.New("SendtoTimeout n:"+strconv.Itoa(n)) //} //更新主题列表 func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) { h.mtxNode.Lock() defer h.mtxNode.Unlock() h.nodes = arr } //获取topic对应的key //如果传了serverId不为空,则获取指定机器上的topic-key //如果server为空,则获取所有节点上topic-key func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) { h.mtxNode.Lock() defer h.mtxNode.Unlock() var nodes []bhomebus.NetNode reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{ Key: h.sockWorker.peer, }) reqD,err := json.Marshal(MsgInfo{ SrcProc: *srcProc, MsgType: MesgType_ReqRep, Topic: TOPIC_QUERYKEY, Body: []byte(topic), }) if err != nil { return nil, fmt.Errorf("marshal req err:%s", err.Error()) } var ret []bhomebus.Mesg n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) if n > 0 { var reply Reply err = json.Unmarshal(ret[0].Data, &reply) if err != nil { h.printLog("unmarshal err:", err) return nil, err } if reply.Success { rd,err := json.Marshal(reply.Data) if err == nil { err = json.Unmarshal(rd, &nodes) if err == nil { return nodes, nil } else { h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data) return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error()) } } else { return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) } } else { h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data) return nil, fmt.Errorf("REPLY msg:%s", reply.Msg) } } else { return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n) } } func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) { //1.首先需要通过topic拿到本机对应的NetNode rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) if err != nil { h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err) return nil, err } if rNodes == nil || len(rNodes) == 0 { return nil, errors.New("rNodes empty, topic: "+ req.Topic) } //2.将请求返送到对应的server,并等待返回值 data, err := json.Marshal(*req) if err != nil { h.printLog("marshal(*req) err:", err) return nil, err } var ret []bhomebus.Mesg n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs) if n > 0 && len(ret) > 0 { var resp Reply if err = json.Unmarshal(ret[0].Data, &resp); err == nil { return &resp, nil } else { h.printLog("unmarshal ret[0].Data err:", err) return nil, err } } else { h.printLog("Request n: ", n, " len(ret): ", len(ret)) return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n) } } func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) { var ret []bhomebus.Mesg n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut) if n > 0 && len(ret) > 0 { return ret[0].Data, nil } else { h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData)) return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n) } } func (h *BHBus) Reply(replyKey int, i *Reply) error { data,err := json.Marshal(*i) if err != nil { return err } n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) h.printLog("reply to key:", replyKey, " n:",n) if n != 0 { return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) } return nil } //只发送请求,不需要应答. //暴露在上层的,只有topic,没有key。 func (h *BHBus) SendOnly(key int, arg *MsgInfo) error { data,err := json.Marshal(*arg) if err != nil { return err } //h.mtxWorker.Lock() //defer h.mtxWorker.Unlock() n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut) if n != 0 { return fmt.Errorf("sendOnly ret n:%d", n) } return nil } func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) { data, err := json.Marshal(*req) if err != nil { return nil, err } rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ Key: KEY_QUERY, }) //h.mtxWorker.Lock() //defer h.mtxWorker.Unlock() var ret []bhomebus.Mesg n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut) h.printLog("requestCenter n:", n, "len(ret):", len(ret)) if n > 0 && len(ret) > 0{ var cr Reply if err = json.Unmarshal(ret[0].Data, &cr); err == nil { return &cr, nil } else { h.printLog("unmarshal to CommonReply err:", err) } } return nil, fmt.Errorf("request center err") } //向主题通道中发布消息 func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error { data,err := json.Marshal(*msg) if err == nil { if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 { return nil } else { return fmt.Errorf("pub err n:%d", n) } } return err } func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int { data,err := json.Marshal(*msg) if err == nil { return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout) } return -1 } //追加订阅的主题消息 func (h *BHBus) Sub(topics []string) { if topics != nil { for _,t := range topics { h.sockSub.sock.Sub(t) } } } //注销订阅的主题 func (h *BHBus) DeSub(topics []string) { if topics != nil { for _,t := range topics { if n := h.sockSub.sock.Desub(t); n != 0 { h.printLog("DeSub topic:", t, " n:", n) } } } } //获取sub 或者需要reply的消息 //func (h *BHBus) GetMsg() (subMsg *MsgInfo) { // if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { // return nil // } // if len(h.chSub) >0 { // m := <-h.chSub // subMsg = m.info // } // return //} func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { return nil, nil, -1 } if len(h.chSub) >0 { m := <-h.chSub subMsg = m.info } if len(h.chReply) >0 { m := <-h.chReply replyMsg = m.info replyKey = m.port } return }