| | |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | "strconv" |
| | | "sync" |
| | | "time" |
| | | ) |
| | |
| | | sockSub *sockClient //订阅主题的socket,线程实时接收消息,需要单独处理 |
| | | |
| | | sockWorker *sockClient //发给任意的server,短暂的request client |
| | | mtxWorker sync.Mutex //SendAndRecv可能不是线程安全的 |
| | | //mtxWorker sync.Mutex //SendAndRecv可能不是线程安全的 |
| | | |
| | | chSub chan TransInfo |
| | | chReply chan TransInfo |
| | | } |
| | | |
| | | //获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。 |
| | |
| | | wg.Done() |
| | | return |
| | | default: |
| | | n := s.RecvfromTimeout(&data, &key, 1000) |
| | | logFn("recvRoutine Recvfrom n:", n) |
| | | n := s.RecvfromTimeout(&data, &key, 1000) //目前10001返回值表示超时 |
| | | if n == 0 { |
| | | var info MsgInfo |
| | | if err := json.Unmarshal(data, &info);err == nil { |
| | |
| | | |
| | | data = []byte{} |
| | | key = 0 |
| | | } else { |
| | | logFn("unmarshal to MsgInfo err:", err) |
| | | } |
| | | } else { |
| | | time.Sleep(10 * time.Millisecond) |
| | | time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func recvandsendRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logFn("recvandsendRoutine ctx.Done") |
| | | wg.Done() |
| | | return |
| | | default: |
| | | n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时 |
| | | if n != 0 { |
| | | logFn("RecvandsendTimeout success") |
| | | } else { |
| | | //time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | //Register |
| | | func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) { |
| | | handle := &BHBus{ |
| | | handle := &BHBus { |
| | | ctx: ctx, |
| | | conf: config, |
| | | m: make(map[string]*sockServer), |
| | | chSub: make(chan TransInfo, config.chSize), |
| | | chReply: make(chan TransInfo, config.chSize), |
| | | } |
| | | |
| | | var err error |
| | |
| | | } |
| | | |
| | | var rMsg []bhomebus.Mesg |
| | | n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n代表成功发送的节点的个数 |
| | | handle.printLog("regSock.Sendandrecv n:", n, "len(rMsg):", len(rMsg)) |
| | | n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n代表成功发送的节点的个数 |
| | | handle.printLog("regSock.SendandrecvTimeout n:", n) |
| | | if n == 1 && len(rMsg) == 1 { |
| | | var cr CommonReply |
| | | var cr Reply |
| | | if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil { |
| | | handle.printLog("unmarshal regReply err:", err) |
| | | return nil, errors.New("unmarshal regReply err:"+err.Error()) |
| | | } else { |
| | | if cr.Status == REPLY_SUCCESS { |
| | | var rr RegisterReply |
| | | if err = json.Unmarshal(cr.Body, &rr);err ==nil { |
| | | regR = &rr |
| | | break loop |
| | | if cr.Success { |
| | | if rpd,err := json.Marshal(cr.Data);err ==nil { |
| | | var rr RegisterReply |
| | | if err = json.Unmarshal(rpd, &rr); err == nil { |
| | | regR = &rr |
| | | break loop |
| | | } else { |
| | | handle.printLog("unmarshal RegisterReply err:", err) |
| | | } |
| | | } else { |
| | | handle.printLog("unmarshal RegisterReply err:", err) |
| | | handle.printLog("marshal cr.Data err:", err) |
| | | } |
| | | |
| | | } else { |
| | | handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc) |
| | | handle.printLog("cr:", cr) |
| | | } |
| | | |
| | | } |
| | | } else { |
| | | time.Sleep(100 * time.Millisecond) |
| | | time.Sleep(1 * time.Second) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | //维持心跳的socket |
| | | sockHB := bhomebus.OpenSocket() |
| | | handle.printLog("open sockHB") |
| | | handle.sockHB = &sockClient{ |
| | | sock: sockHB, |
| | | peer: int(regR.HeartbeatKey), |
| | |
| | | if ri.PubTopic != nil && len(ri.PubTopic) > 0 { |
| | | sockReply := bhomebus.OpenSocket() |
| | | sockReply.ForceBind(int(regR.ReplyKey)) |
| | | handle.wg.Add(1) |
| | | handle.printLog("after pubTopic forceBind") |
| | | //handle.wg.Add(1) |
| | | //serve server reply |
| | | go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | //go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog) |
| | | handle.sockRep = &sockServer{ |
| | | sock: sockReply, |
| | | info: &ri.Proc, |
| | |
| | | //订阅消息的socket |
| | | sockSub := bhomebus.OpenSocket() |
| | | //订阅所有主题 |
| | | handle.printLog("start Sub topics") |
| | | for _,v := range ri.SubTopic { |
| | | sockSub.Sub(v) |
| | | subN := sockSub.Sub(v) |
| | | handle.printLog("subTopic:", v, " ret n:", subN) |
| | | } |
| | | |
| | | //启动订阅信息接收 |
| | |
| | | return err |
| | | } |
| | | |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | //h.mtxWorker.Lock() |
| | | //defer h.mtxWorker.Unlock() |
| | | netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: h.conf.regKey, |
| | | }) |
| | |
| | | var ret []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut) |
| | | if n > 0 { |
| | | var reply CommonReply |
| | | var reply Reply |
| | | err = json.Unmarshal(ret[0].Data, &reply) |
| | | if err != nil { |
| | | h.printLog("unmarshal err:", err) |
| | | return nil, err |
| | | } |
| | | |
| | | if reply.Status == REPLY_SUCCESS { |
| | | err = json.Unmarshal(reply.Body, &nodes) |
| | | if reply.Success { |
| | | rd,err := json.Marshal(reply.Data) |
| | | if err == nil { |
| | | return nodes, nil |
| | | err = json.Unmarshal(rd, &nodes) |
| | | if err == nil { |
| | | return nodes, nil |
| | | } else { |
| | | h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data) |
| | | return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error()) |
| | | } |
| | | } else { |
| | | h.printLog("unmarshal err:", err, "nodes:", nodes) |
| | | return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error()) |
| | | return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error()) |
| | | } |
| | | |
| | | } else { |
| | | h.printLog("reply status:", reply.Status, "desc:", reply.Desc, "body:", string(reply.Body)) |
| | | return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status) |
| | | h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data) |
| | | return nil, fmt.Errorf("REPLY msg:%s", reply.Msg) |
| | | } |
| | | } else { |
| | | return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n) |
| | | } |
| | | } |
| | | |
| | | func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) { |
| | | func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) { |
| | | //1.首先需要通过topic拿到本机对应的NetNode |
| | | rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic) |
| | | h.printLog("topic", req.Topic, "rNodes:", rNodes, "err:", err) |
| | |
| | | var ret []bhomebus.Mesg |
| | | |
| | | n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs) |
| | | h.printLog("Request n: ", n, " len(ret): ", len(ret)) |
| | | |
| | | if n > 0 && len(ret) > 0 { |
| | | if err = json.Unmarshal(ret[0].Data, resp); err == nil { |
| | | return resp, nil |
| | | var resp Reply |
| | | if err = json.Unmarshal(ret[0].Data, &resp); err == nil { |
| | | return &resp, nil |
| | | } else { |
| | | h.printLog("unmarshal ret[0].Data err:", err) |
| | | return nil, err |
| | | } |
| | | } else { |
| | | h.printLog("Request n: ", n, " len(ret): ", len(ret)) |
| | | } |
| | | return nil, fmt.Errorf("request err") |
| | | } |
| | | |
| | | func (h *BHBus) Reply(replyKey int, i MsgInfo) error { |
| | | data,err := json.Marshal(i) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) { |
| | | var ret []bhomebus.Mesg |
| | | |
| | | n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) |
| | | h.printLog("reply to key:", replyKey, " n:",n) |
| | | if n != 0 { |
| | | return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) |
| | | n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut) |
| | | |
| | | if n > 0 && len(ret) > 0 { |
| | | return ret[0].Data, nil |
| | | } else { |
| | | h.printLog("Request n: ", n, " len(ret): ", len(ret)) |
| | | } |
| | | return nil |
| | | return nil, fmt.Errorf("request err") |
| | | } |
| | | |
| | | //func (h *BHBus) Reply(replyKey int, i *Reply) error { |
| | | // data,err := json.Marshal(*i) |
| | | // if err != nil { |
| | | // return err |
| | | // } |
| | | // |
| | | // n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut) |
| | | // h.printLog("reply to key:", replyKey, " n:",n) |
| | | // if n != 0 { |
| | | // return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n)) |
| | | // } |
| | | // return nil |
| | | //} |
| | | |
| | | |
| | | //只发送请求,不需要应答. |
| | |
| | | if err != nil { |
| | | return err |
| | | } |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | //h.mtxWorker.Lock() |
| | | //defer h.mtxWorker.Unlock() |
| | | n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut) |
| | | if n != 0 { |
| | | return fmt.Errorf("sendOnly ret n:%d", n) |
| | |
| | | return nil |
| | | } |
| | | |
| | | func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) { |
| | | func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) { |
| | | data, err := json.Marshal(*req) |
| | | if err != nil { |
| | | return nil, err |
| | |
| | | rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{ |
| | | Key: KEY_QUERY, |
| | | }) |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | //h.mtxWorker.Lock() |
| | | //defer h.mtxWorker.Unlock() |
| | | var ret []bhomebus.Mesg |
| | | n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut) |
| | | h.printLog("requestCenter n:", n, "len(ret):", len(ret)) |
| | | if n > 0 && len(ret) > 0{ |
| | | var cr CommonReply |
| | | var cr Reply |
| | | if err = json.Unmarshal(ret[0].Data, &cr); err == nil { |
| | | return &cr, nil |
| | | } else { |
| | |
| | | func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error { |
| | | data,err := json.Marshal(*msg) |
| | | if err == nil { |
| | | if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n == 0 { |
| | | if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 { |
| | | return nil |
| | | } else { |
| | | return fmt.Errorf("pub err n:%d", n) |
| | |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int { |
| | | data,err := json.Marshal(*msg) |
| | | if err == nil { |
| | | return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout) |
| | | } |
| | | return -1 |
| | | } |
| | | |
| | | //追加订阅的主题消息 |
| | |
| | | |
| | | |
| | | //获取sub 或者需要reply的消息 |
| | | func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { |
| | | if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { |
| | | return nil,nil, -1 |
| | | func (h *BHBus) GetMsg() (subMsg *MsgInfo) { |
| | | if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil { |
| | | return nil |
| | | } |
| | | if len(h.chSub) >1 { |
| | | if len(h.chSub) >0 { |
| | | m := <-h.chSub |
| | | subMsg = m.info |
| | | } |
| | | if len(h.chReply) > 1 { |
| | | m := <-h.chReply |
| | | replyMsg = m.info |
| | | replyKey = m.port |
| | | } |
| | | return |
| | | } |