package softbus import ( "context" "fmt" "time" "github.com/golang/protobuf/proto" ) const ( // RegKey fixed key for hb to servermanager RegKey = 12 // GetTopicInfoTypeTopic topic GetTopicInfoTypeTopic = "gettopic" // GetTopicInfoTypeChannel channel GetTopicInfoTypeChannel = "getchannel" ) type subOReply struct { sock *DgramSocket info *ProcInfo } type sockRe struct { sock *DgramSocket peer int } // TransInfo 传输的数据和必要的记录 type TransInfo struct { info *MsgInfo port int } // Handle handler type Handle struct { ctx context.Context // 创建channel对应的reply,等待读取其中的内容,server // 其中必须有一个作为Request函数的server m map[string]*subOReply // 创建心跳连接,client,仅发送心跳信息 sockHB *sockRe // 创建更新主题连接,client,仅发送主题更新信息 sockUp *sockRe // 创建订阅的socket sockSub *subOReply // 创建reply服务Request函数 sockRep *sockRe // 创建一个万能socket发送给任意server sockWorker *sockRe chSub chan TransInfo chReply chan TransInfo } func garbageCollect(ctx context.Context, h *Handle) { <-ctx.Done() for _, v := range h.m { v.sock.Close() } h.sockHB.sock.Close() h.sockUp.sock.Close() h.sockSub.sock.Close() h.sockRep.sock.Close() h.sockWorker.sock.Close() } func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) { for { select { case <-ctx.Done(): return default: if data, peer, err := sock.RecvFrom(); err == nil { var info *MsgInfo if err := proto.Unmarshal(data, info); err == nil { ch <- TransInfo{ info: info, port: peer, } } } } } } // Register reg func Register(ctx context.Context, info *RegisterInfo) *Handle { m := make(map[string]*subOReply) // 首先请求一堆key sockReg := OpenDgramSocket() if sockReg == nil { return nil } defer sockReg.Close() var msg, rdata []byte var err error loop: for { select { case <-ctx.Done(): return nil default: if msg == nil { if msg, err = proto.Marshal(info); err != nil { time.Sleep(100 * time.Millisecond) continue } } if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil { break loop } } } // 得到key,赋值 var regReply RegisterInfoReply if err := proto.Unmarshal(rdata, ®Reply); err != nil { return nil } // 收发req/rep channel for _, v := range info.Channel { if k, ok := regReply.ChannelKey[v]; ok { s := OpenDgramSocket() s.Bind(int(k)) m[v] = &subOReply{ sock: s, info: info.ProcInfo, } } } chSize := 5 chSub := make(chan TransInfo, chSize) chReply := make(chan TransInfo, chSize) // heartbeat使用一个socket sockHB := OpenDgramSocket() hbr := &sockRe{ sock: sockHB, peer: int(regReply.HeartbeatKey), } // 更新主题使用一个 sockUp := OpenDgramSocket() upr := &sockRe{ sock: sockUp, peer: int(regReply.UpdateTopicKey), } // sub使用一个socket sockSub := OpenDgramSocket() sockSub.Bind(int(regReply.SubTopicKey)) // 启动接收线程 go recvRoutine(ctx, sockSub, chSub) sub := &subOReply{ sock: sockSub, info: info.ProcInfo, } // reply使用一个,服务Request sockReply := OpenDgramSocket() sockReply.Bind(int(regReply.ReplyKey)) // 启动接收线程 go recvRoutine(ctx, sockReply, chSub) rer := &sockRe{ sock: sockReply, peer: -1, } // 万能socket,仅作为客户端使用 sockW := OpenDgramSocket() swr := &sockRe{ sock: sockW, peer: -1, } handle := &Handle{ ctx: ctx, m: m, sockHB: hbr, sockUp: upr, sockSub: sub, sockRep: rer, sockWorker: swr, chSub: chSub, chReply: chReply, } go garbageCollect(ctx, handle) return handle } // GetTopicInfo get topic info func (h *Handle) GetTopicInfo(topic, typ string) int { if v, ok := h.m[topic]; ok { return v.sock.Port() } return -1 } func (h *Handle) send2(sr *sockRe, data []byte, logID string) error { if r := sr.sock.SendTo(data, sr.peer); r != 0 { return fmt.Errorf("%s SendTo Failed: %d", logID, r) } return nil } // HeartBeat hb func (h *Handle) HeartBeat(info *HeartbeatInfo) error { msg, err := proto.Marshal(info) if err == nil { return h.send2(h.sockHB, msg, "HeartBeat") } return err } // SendOnly no recv func (h *Handle) SendOnly(key int, info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { return fmt.Errorf("SendOnly Failed: %d", r) } } return err } // Pub func func (h *Handle) Pub(info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { return h.send2(h.sockUp, msg, "Pub") } return err } // Request req sync func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo { msg, err := proto.Marshal(info) if err != nil { return nil } // 同步接口,需要等待返回值 var ret *MsgInfo loop: for { select { case <-h.ctx.Done(): return nil default: if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil { if err := proto.Unmarshal(data, ret); err == nil { break loop } } } } return ret } // Reply request func (h *Handle) Reply(key int, info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { if r := h.sockRep.sock.SendTo(msg, key); r != 0 { return fmt.Errorf("Reply Failed: %d", r) } } return err } // GetMesg get mesg for sub or reply func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { if len(h.chSub) > 1 { m := <-h.chSub subMsg = m.info } if len(h.chReply) > 1 { m := <-h.chReply replyMsg = m.info replyKey = m.port } return }