| | |
| | | package softbus |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "time" |
| | | |
| | |
| | | ) |
| | | |
| | | const ( |
| | | // HeartbeatKey fixed key for hb to servermanager |
| | | HeartbeatKey = 11 |
| | | // RegKey fixed key for hb to servermanager |
| | | RegKey = 12 |
| | | // UpKey fixed key for update topic to servermanager |
| | | UpKey = 13 |
| | | // GetTopicInfoTypeTopic topic |
| | | GetTopicInfoTypeTopic = "gettopic" |
| | | // GetTopicInfoTypeChannel channel |
| | | GetTopicInfoTypeChannel = "getchannel" |
| | | ) |
| | | |
| | | type shmKeyAndProcInfo struct { |
| | | type subOReply struct { |
| | | sock *DgramSocket |
| | | info *ProcInfo |
| | | } |
| | | |
| | | type sockRe struct { |
| | | sock *DgramSocket |
| | | peer int |
| | | } |
| | | |
| | | // TransInfo 传输的数据和必要的记录 |
| | | type TransInfo struct { |
| | | info *MsgInfo |
| | | port int |
| | | } |
| | | |
| | | // Handle handler |
| | | type Handle struct { |
| | | m map[string]*shmKeyAndProcInfo |
| | | sockWorker *DgramSocket |
| | | ctx context.Context |
| | | // 创建channel对应的reply,等待读取其中的内容,server |
| | | // 其中必须有一个作为Request函数的server |
| | | m map[string]*subOReply |
| | | // 创建心跳连接,client,仅发送心跳信息 |
| | | sockHB *sockRe |
| | | // 创建更新主题连接,client,仅发送主题更新信息 |
| | | sockUp *sockRe |
| | | // 创建订阅的socket |
| | | sockSub *subOReply |
| | | // 创建reply服务Request函数 |
| | | sockRep *sockRe |
| | | // 创建一个万能socket发送给任意server |
| | | sockWorker *sockRe |
| | | |
| | | chSub chan TransInfo |
| | | chReply chan TransInfo |
| | | } |
| | | |
| | | func garbageCollect(ctx context.Context, h *Handle) { |
| | | |
| | | <-ctx.Done() |
| | | |
| | | for _, v := range h.m { |
| | | v.sock.Close() |
| | | } |
| | | h.sockHB.sock.Close() |
| | | h.sockUp.sock.Close() |
| | | h.sockSub.sock.Close() |
| | | h.sockRep.sock.Close() |
| | | h.sockWorker.sock.Close() |
| | | } |
| | | |
| | | func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if data, peer, err := sock.RecvFrom(); err == nil { |
| | | var info *MsgInfo |
| | | if err := proto.Unmarshal(data, info); err == nil { |
| | | ch <- TransInfo{ |
| | | info: info, |
| | | port: peer, |
| | | } |
| | | } |
| | | } |
| | | time.Sleep(50 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Register reg |
| | | func Register(info *RegisterInfo) *Handle { |
| | | m := make(map[string]*shmKeyAndProcInfo) |
| | | func Register(ctx context.Context, info *RegisterInfo) *Handle { |
| | | m := make(map[string]*subOReply) |
| | | |
| | | // 首先请求一堆key |
| | | sockReg := OpenDgramSocket() |
| | | if sockReg == nil { |
| | | return nil |
| | | } |
| | | defer sockReg.Close() |
| | | |
| | | var msg, rdata []byte |
| | | var err error |
| | | loop: |
| | | for { |
| | | if msg == nil { |
| | | if msg, err = proto.Marshal(info); err != nil { |
| | | time.Sleep(100 * time.Millisecond) |
| | | continue |
| | | select { |
| | | case <-ctx.Done(): |
| | | return nil |
| | | default: |
| | | |
| | | if msg == nil { |
| | | if msg, err = proto.Marshal(info); err != nil { |
| | | time.Sleep(100 * time.Millisecond) |
| | | continue |
| | | } |
| | | } |
| | | if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil { |
| | | break loop |
| | | } |
| | | time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | // 得到key,赋值 |
| | | var regReply RegisterInfoReply |
| | | if err := proto.Unmarshal(rdata, ®Reply); err != nil { |
| | | return nil |
| | | } |
| | | |
| | | // 收发req/rep channel |
| | | for _, v := range info.Channel { |
| | | if k, ok := regReply.ChannelKey[v]; ok { |
| | | s := OpenDgramSocket() |
| | | s.Bind(int(k)) |
| | | m[v] = &subOReply{ |
| | | sock: s, |
| | | info: info.ProcInfo, |
| | | } |
| | | } |
| | | if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil { |
| | | break |
| | | } |
| | | time.Sleep(100 * time.Millisecond) |
| | | } |
| | | sockReg.Close() |
| | | // 得到key,赋值 |
| | | if rdata != nil { |
| | | |
| | | } |
| | | // 只收不发 |
| | | for _, v := range info.Channel { |
| | | s := OpenDgramSocket() |
| | | m[v] = &shmKeyAndProcInfo{ |
| | | sock: s, |
| | | info: info.ProcInfo, |
| | | } |
| | | } |
| | | |
| | | // pub/sub使用同一个socket |
| | | pbs := OpenDgramSocket() |
| | | for _, v := range info.PubTopic { |
| | | m[v] = &shmKeyAndProcInfo{ |
| | | sock: pbs, |
| | | info: info.ProcInfo, |
| | | } |
| | | chSize := 5 |
| | | chSub := make(chan TransInfo, chSize) |
| | | chReply := make(chan TransInfo, chSize) |
| | | |
| | | // heartbeat使用一个socket |
| | | sockHB := OpenDgramSocket() |
| | | hbr := &sockRe{ |
| | | sock: sockHB, |
| | | peer: int(regReply.HeartbeatKey), |
| | | } |
| | | for _, v := range info.SubTopic { |
| | | m[v] = &shmKeyAndProcInfo{ |
| | | sock: pbs, |
| | | info: info.ProcInfo, |
| | | } |
| | | // 更新主题使用一个 |
| | | sockUp := OpenDgramSocket() |
| | | upr := &sockRe{ |
| | | sock: sockUp, |
| | | peer: int(regReply.UpdateTopicKey), |
| | | } |
| | | |
| | | s := OpenDgramSocket() |
| | | return &Handle{ |
| | | // sub使用一个socket |
| | | sockSub := OpenDgramSocket() |
| | | sockSub.Bind(int(regReply.SubTopicKey)) |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockSub, chSub) |
| | | sub := &subOReply{ |
| | | sock: sockSub, |
| | | info: info.ProcInfo, |
| | | } |
| | | // reply使用一个,服务Request |
| | | sockReply := OpenDgramSocket() |
| | | sockReply.Bind(int(regReply.ReplyKey)) |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockReply, chSub) |
| | | rer := &sockRe{ |
| | | sock: sockReply, |
| | | peer: -1, |
| | | } |
| | | |
| | | // 万能socket,仅作为客户端使用 |
| | | sockW := OpenDgramSocket() |
| | | swr := &sockRe{ |
| | | sock: sockW, |
| | | peer: -1, |
| | | } |
| | | handle := &Handle{ |
| | | ctx: ctx, |
| | | m: m, |
| | | sockWorker: s, |
| | | sockHB: hbr, |
| | | sockUp: upr, |
| | | sockSub: sub, |
| | | sockRep: rer, |
| | | sockWorker: swr, |
| | | chSub: chSub, |
| | | chReply: chReply, |
| | | } |
| | | |
| | | go garbageCollect(ctx, handle) |
| | | |
| | | return handle |
| | | } |
| | | |
| | | // GetTopicInfo get topic info |
| | |
| | | return -1 |
| | | } |
| | | |
| | | func (h *Handle) send2(data []byte, key int, logID string) error { |
| | | if r := h.sockWorker.SendTo(data, key); r != 0 { |
| | | func (h *Handle) send2(sr *sockRe, data []byte, logID string) error { |
| | | if r := sr.sock.SendTo(data, sr.peer); r != 0 { |
| | | return fmt.Errorf("%s SendTo Failed: %d", logID, r) |
| | | } |
| | | return nil |
| | |
| | | func (h *Handle) HeartBeat(info *HeartbeatInfo) error { |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | return h.send2(msg, HeartbeatKey, "HeartBeat") |
| | | return h.send2(h.sockHB, msg, "HeartBeat") |
| | | } |
| | | return err |
| | | } |
| | |
| | | func (h *Handle) SendOnly(key int, info *MsgInfo) error { |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | return h.send2(msg, key, "SendOnly/Pub") |
| | | if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { |
| | | return fmt.Errorf("SendOnly Failed: %d", r) |
| | | } |
| | | } |
| | | return err |
| | | } |
| | | |
| | | // Pub func |
| | | func (h *Handle) Pub(info *MsgInfo) error { |
| | | // return h.SendOnly(PubKey, info) |
| | | return nil |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | return h.send2(h.sockUp, msg, "Pub") |
| | | } |
| | | return err |
| | | } |
| | | |
| | | // Request req sync |
| | | func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo { |
| | | msg, err := proto.Marshal(info) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | | |
| | | // 同步接口,需要等待返回值 |
| | | var ret *MsgInfo |
| | | loop: |
| | | for { |
| | | select { |
| | | case <-h.ctx.Done(): |
| | | return nil |
| | | default: |
| | | if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil { |
| | | if err := proto.Unmarshal(data, ret); err == nil { |
| | | break loop |
| | | } |
| | | } |
| | | time.Sleep(100 * time.Millisecond) |
| | | } |
| | | } |
| | | return ret |
| | | } |
| | | |
| | | // Reply request |
| | | func (h *Handle) Reply(key int, info *MsgInfo) error { |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | if r := h.sockRep.sock.SendTo(msg, key); r != 0 { |
| | | return fmt.Errorf("Reply Failed: %d", r) |
| | | } |
| | | } |
| | | return err |
| | | } |
| | | |
| | | // GetMesg get mesg for sub or reply |
| | | func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { |
| | | if len(h.chSub) > 1 { |
| | | m := <-h.chSub |
| | | subMsg = m.info |
| | | } |
| | | |
| | | if len(h.chReply) > 1 { |
| | | m := <-h.chReply |
| | | replyMsg = m.info |
| | | replyKey = m.port |
| | | } |
| | | return |
| | | } |