From 1baf50119d7d19b276b132f6837e86b396f186ef Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 30 七月 2020 11:09:26 +0800 Subject: [PATCH] update dgram socket --- library.go | 267 +++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 216 insertions(+), 51 deletions(-) diff --git a/library.go b/library.go index e8b107c..0f32d87 100644 --- a/library.go +++ b/library.go @@ -1,6 +1,7 @@ package softbus import ( + "context" "fmt" "time" @@ -8,86 +9,193 @@ ) const ( - // HeartbeatKey fixed key for hb to servermanager - HeartbeatKey = 11 // RegKey fixed key for hb to servermanager RegKey = 12 - // UpKey fixed key for update topic to servermanager - UpKey = 13 // GetTopicInfoTypeTopic topic GetTopicInfoTypeTopic = "gettopic" // GetTopicInfoTypeChannel channel GetTopicInfoTypeChannel = "getchannel" ) -type shmKeyAndProcInfo struct { +type subOReply struct { sock *DgramSocket info *ProcInfo } +type sockRe struct { + sock *DgramSocket + peer int +} + +// TransInfo 浼犺緭鐨勬暟鎹拰蹇呰鐨勮褰� +type TransInfo struct { + info *MsgInfo + port int +} + // Handle handler type Handle struct { - m map[string]*shmKeyAndProcInfo - sockWorker *DgramSocket + ctx context.Context + // 鍒涘缓channel瀵瑰簲鐨剅eply,绛夊緟璇诲彇鍏朵腑鐨勫唴瀹�,server + // 鍏朵腑蹇呴』鏈変竴涓綔涓篟equest鍑芥暟鐨剆erver + m map[string]*subOReply + // 鍒涘缓蹇冭烦杩炴帴,client,浠呭彂閫佸績璺充俊鎭� + sockHB *sockRe + // 鍒涘缓鏇存柊涓婚杩炴帴,client,浠呭彂閫佷富棰樻洿鏂颁俊鎭� + sockUp *sockRe + // 鍒涘缓璁㈤槄鐨剆ocket + sockSub *subOReply + // 鍒涘缓reply鏈嶅姟Request鍑芥暟 + sockRep *sockRe + // 鍒涘缓涓�涓竾鑳絪ocket鍙戦�佺粰浠绘剰server + sockWorker *sockRe + + chSub chan TransInfo + chReply chan TransInfo +} + +func garbageCollect(ctx context.Context, h *Handle) { + + <-ctx.Done() + + for _, v := range h.m { + v.sock.Close() + } + h.sockHB.sock.Close() + h.sockUp.sock.Close() + h.sockSub.sock.Close() + h.sockRep.sock.Close() + h.sockWorker.sock.Close() +} + +func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) { + for { + select { + case <-ctx.Done(): + return + default: + if data, peer, err := sock.RecvFrom(); err == nil { + var info *MsgInfo + if err := proto.Unmarshal(data, info); err == nil { + ch <- TransInfo{ + info: info, + port: peer, + } + } + } + time.Sleep(50 * time.Millisecond) + } + } } // Register reg -func Register(info *RegisterInfo) *Handle { - m := make(map[string]*shmKeyAndProcInfo) +func Register(ctx context.Context, info *RegisterInfo) *Handle { + m := make(map[string]*subOReply) // 棣栧厛璇锋眰涓�鍫唊ey sockReg := OpenDgramSocket() if sockReg == nil { return nil } + defer sockReg.Close() + var msg, rdata []byte var err error +loop: for { - if msg == nil { - if msg, err = proto.Marshal(info); err != nil { - time.Sleep(100 * time.Millisecond) - continue + select { + case <-ctx.Done(): + return nil + default: + + if msg == nil { + if msg, err = proto.Marshal(info); err != nil { + time.Sleep(100 * time.Millisecond) + continue + } + } + if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil { + break loop + } + time.Sleep(100 * time.Millisecond) + } + } + + // 寰楀埌key,璧嬪�� + var regReply RegisterInfoReply + if err := proto.Unmarshal(rdata, ®Reply); err != nil { + return nil + } + + // 鏀跺彂req/rep channel + for _, v := range info.Channel { + if k, ok := regReply.ChannelKey[v]; ok { + s := OpenDgramSocket() + s.Bind(int(k)) + m[v] = &subOReply{ + sock: s, + info: info.ProcInfo, } } - if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - sockReg.Close() - // 寰楀埌key,璧嬪�� - if rdata != nil { - - } - // 鍙敹涓嶅彂 - for _, v := range info.Channel { - s := OpenDgramSocket() - m[v] = &shmKeyAndProcInfo{ - sock: s, - info: info.ProcInfo, - } } - // pub/sub浣跨敤鍚屼竴涓猻ocket - pbs := OpenDgramSocket() - for _, v := range info.PubTopic { - m[v] = &shmKeyAndProcInfo{ - sock: pbs, - info: info.ProcInfo, - } + chSize := 5 + chSub := make(chan TransInfo, chSize) + chReply := make(chan TransInfo, chSize) + + // heartbeat浣跨敤涓�涓猻ocket + sockHB := OpenDgramSocket() + hbr := &sockRe{ + sock: sockHB, + peer: int(regReply.HeartbeatKey), } - for _, v := range info.SubTopic { - m[v] = &shmKeyAndProcInfo{ - sock: pbs, - info: info.ProcInfo, - } + // 鏇存柊涓婚浣跨敤涓�涓� + sockUp := OpenDgramSocket() + upr := &sockRe{ + sock: sockUp, + peer: int(regReply.UpdateTopicKey), } - s := OpenDgramSocket() - return &Handle{ + // sub浣跨敤涓�涓猻ocket + sockSub := OpenDgramSocket() + sockSub.Bind(int(regReply.SubTopicKey)) + // 鍚姩鎺ユ敹绾跨▼ + go recvRoutine(ctx, sockSub, chSub) + sub := &subOReply{ + sock: sockSub, + info: info.ProcInfo, + } + // reply浣跨敤涓�涓�,鏈嶅姟Request + sockReply := OpenDgramSocket() + sockReply.Bind(int(regReply.ReplyKey)) + // 鍚姩鎺ユ敹绾跨▼ + go recvRoutine(ctx, sockReply, chSub) + rer := &sockRe{ + sock: sockReply, + peer: -1, + } + + // 涓囪兘socket,浠呬綔涓哄鎴风浣跨敤 + sockW := OpenDgramSocket() + swr := &sockRe{ + sock: sockW, + peer: -1, + } + handle := &Handle{ + ctx: ctx, m: m, - sockWorker: s, + sockHB: hbr, + sockUp: upr, + sockSub: sub, + sockRep: rer, + sockWorker: swr, + chSub: chSub, + chReply: chReply, } + + go garbageCollect(ctx, handle) + + return handle } // GetTopicInfo get topic info @@ -98,8 +206,8 @@ return -1 } -func (h *Handle) send2(data []byte, key int, logID string) error { - if r := h.sockWorker.SendTo(data, key); r != 0 { +func (h *Handle) send2(sr *sockRe, data []byte, logID string) error { + if r := sr.sock.SendTo(data, sr.peer); r != 0 { return fmt.Errorf("%s SendTo Failed: %d", logID, r) } return nil @@ -109,7 +217,7 @@ func (h *Handle) HeartBeat(info *HeartbeatInfo) error { msg, err := proto.Marshal(info) if err == nil { - return h.send2(msg, HeartbeatKey, "HeartBeat") + return h.send2(h.sockHB, msg, "HeartBeat") } return err } @@ -118,13 +226,70 @@ func (h *Handle) SendOnly(key int, info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { - return h.send2(msg, key, "SendOnly/Pub") + if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { + return fmt.Errorf("SendOnly Failed: %d", r) + } } return err } // Pub func func (h *Handle) Pub(info *MsgInfo) error { - // return h.SendOnly(PubKey, info) - return nil + msg, err := proto.Marshal(info) + if err == nil { + return h.send2(h.sockUp, msg, "Pub") + } + return err +} + +// Request req sync +func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo { + msg, err := proto.Marshal(info) + if err != nil { + return nil + } + + // 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲�� + var ret *MsgInfo +loop: + for { + select { + case <-h.ctx.Done(): + return nil + default: + if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil { + if err := proto.Unmarshal(data, ret); err == nil { + break loop + } + } + time.Sleep(100 * time.Millisecond) + } + } + return ret +} + +// Reply request +func (h *Handle) Reply(key int, info *MsgInfo) error { + msg, err := proto.Marshal(info) + if err == nil { + if r := h.sockRep.sock.SendTo(msg, key); r != 0 { + return fmt.Errorf("Reply Failed: %d", r) + } + } + return err +} + +// GetMesg get mesg for sub or reply +func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { + if len(h.chSub) > 1 { + m := <-h.chSub + subMsg = m.info + } + + if len(h.chReply) > 1 { + m := <-h.chReply + replyMsg = m.info + replyKey = m.port + } + return } -- Gitblit v1.8.0