From dba48754d9623a49b155e94c65341b773bf4eeef Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 31 七月 2020 09:37:27 +0800 Subject: [PATCH] add get topic key --- library.go | 112 ++++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 72 insertions(+), 40 deletions(-) diff --git a/library.go b/library.go index 242ac00..d81f0ce 100644 --- a/library.go +++ b/library.go @@ -3,6 +3,7 @@ import ( "context" "fmt" + "sync" "time" "github.com/golang/protobuf/proto" @@ -17,12 +18,12 @@ GetTopicInfoTypeChannel = "getchannel" ) -type subOReply struct { +type sockServer struct { sock *DgramSocket info *ProcInfo } -type sockRe struct { +type sockClient struct { sock *DgramSocket peer int } @@ -34,21 +35,31 @@ } // Handle handler +/* +sockHB/sockPub/sockWorker鍙互浣跨敤涓�涓猻ocket +浣嗘槸鐢变簬闇�瑕佹敮鎸佸绾跨▼涓斿績璺�/鍙戝竷閮芥槸寰堥噸瑕佺殑淇℃伅,鍗曠嫭涓�涓猻ocket澶勭悊 +worker澶勭悊鐭椂鐨勫彂閫� +*/ type Handle struct { ctx context.Context // 鍒涘缓channel瀵瑰簲鐨剅eply,绛夊緟璇诲彇鍏朵腑鐨勫唴瀹�,server // 鍏朵腑蹇呴』鏈変竴涓綔涓篟equest鍑芥暟鐨剆erver - m map[string]*subOReply - // 鍒涘缓蹇冭烦杩炴帴,client,浠呭彂閫佸績璺充俊鎭� - sockHB *sockRe - // 鍒涘缓鏇存柊涓婚杩炴帴,client,浠呭彂閫佷富棰樻洿鏂颁俊鎭� - sockUp *sockRe - // 鍒涘缓璁㈤槄鐨剆ocket - sockSub *subOReply + m map[string]*sockServer // 鍒涘缓reply鏈嶅姟Request鍑芥暟 - sockRep *sockRe + sockRep *sockServer + // 鍒涘缓蹇冭烦杩炴帴,client,浠呭彂閫佸績璺充俊鎭� + // 蹇冭烦闇�瑕佷繚璇佸崟鐙殑socket鍙戦��,濡傛灉璺熷叾浠栧叡鐢╯ocket,濡傛灉闃诲灏辨棤娉曞彂閫� + sockHB *sockClient + // 鍒涘缓鏇存柊涓婚杩炴帴,client,浠呭彂閫佷富棰樻洿鏂颁俊鎭� + // 鍙戦�佹湰韬殑pub淇℃伅,寰堝彲鑳藉叾浠栬繘绋嬩緷璧�,闇�瑕佸崟鐙瑂ocket澶勭悊 + sockPub *sockClient + // 鍒涘缓璁㈤槄鐨剆ocket + // 璁㈤槄鐨勪富棰樺彂閫佺殑娑堟伅 + sockSub *sockClient // 鍒涘缓涓�涓竾鑳絪ocket鍙戦�佺粰浠绘剰server - sockWorker *sockRe + sockWorker *sockClient + // 澶氱嚎绋� + mtxWorker sync.Mutex chSub chan TransInfo chReply chan TransInfo @@ -62,7 +73,7 @@ v.sock.Close() } h.sockHB.sock.Close() - h.sockUp.sock.Close() + h.sockPub.sock.Close() h.sockSub.sock.Close() h.sockRep.sock.Close() h.sockWorker.sock.Close() @@ -89,7 +100,7 @@ // Register reg func Register(ctx context.Context, info *RegisterInfo) *Handle { - m := make(map[string]*subOReply) + m := make(map[string]*sockServer) // 棣栧厛璇锋眰涓�鍫唊ey sockReg := OpenDgramSocket() @@ -125,12 +136,12 @@ return nil } - // 鏀跺彂req/rep channel + // 鏀跺彂req/rep channel, server for _, v := range info.Channel { if k, ok := regReply.ChannelKey[v]; ok { s := OpenDgramSocket() s.Bind(int(k)) - m[v] = &subOReply{ + m[v] = &sockServer{ sock: s, info: info.ProcInfo, } @@ -141,20 +152,29 @@ chSub := make(chan TransInfo, chSize) chReply := make(chan TransInfo, chSize) - // heartbeat浣跨敤涓�涓猻ocket + // reply浣跨敤涓�涓�,鏈嶅姟Request, server + sockReply := OpenDgramSocket() + sockReply.Bind(int(regReply.ReplyKey)) + // 鍚姩鎺ユ敹绾跨▼ + go recvRoutine(ctx, sockReply, chSub) + repS := &sockServer{ + sock: sockReply, + info: info.ProcInfo, + } + + // heartbeat浣跨敤涓�涓猻ocket, client sockHB := OpenDgramSocket() - hbr := &sockRe{ + hbC := &sockClient{ sock: sockHB, peer: int(regReply.HeartbeatKey), } - // 鏇存柊涓婚浣跨敤涓�涓� + // 鍙戝竷涓婚浣跨敤涓�涓�, client sockUp := OpenDgramSocket() - upr := &sockRe{ + pubC := &sockClient{ sock: sockUp, peer: int(regReply.UpdateTopicKey), } - - // sub浣跨敤涓�涓猻ocket + // sub浣跨敤涓�涓猻ocket, client sockSub := OpenDgramSocket() // sockSub.Bind(int(regReply.SubTopicKey)) // 璁㈤槄涓婚 @@ -163,34 +183,25 @@ } // 鍚姩鎺ユ敹绾跨▼ go recvRoutine(ctx, sockSub, chSub) - sub := &subOReply{ + subC := &sockClient{ sock: sockSub, - info: info.ProcInfo, - } - // reply浣跨敤涓�涓�,鏈嶅姟Request - sockReply := OpenDgramSocket() - sockReply.Bind(int(regReply.ReplyKey)) - // 鍚姩鎺ユ敹绾跨▼ - go recvRoutine(ctx, sockReply, chSub) - rer := &sockRe{ - sock: sockReply, peer: -1, } // 涓囪兘socket,浠呬綔涓哄鎴风浣跨敤 sockW := OpenDgramSocket() - swr := &sockRe{ + uniC := &sockClient{ sock: sockW, peer: -1, } handle := &Handle{ ctx: ctx, m: m, - sockHB: hbr, - sockUp: upr, - sockSub: sub, - sockRep: rer, - sockWorker: swr, + sockHB: hbC, + sockPub: pubC, + sockSub: subC, + sockRep: repS, + sockWorker: uniC, chSub: chSub, chReply: chReply, } @@ -207,11 +218,27 @@ if v, ok := h.m[topic]; ok { return v.sock.Port() } + // 杩滅▼鑾峰彇 + msg := &TopicInfo{ + Topic: topic, + TopicType: typ, + } + if data, err := proto.Marshal(msg); err == nil { + h.mtxWorker.Lock() + if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil { + h.mtxWorker.Unlock() + var rmsg *TopicInfoReply + if err := proto.Unmarshal(rdata, rmsg); err == nil { + return int(rmsg.Key) + } + } + h.mtxWorker.Unlock() + } return -1 } -func (h *Handle) send2(sr *sockRe, data []byte, logID string) error { - if r := sr.sock.SendTo(data, sr.peer); r != 0 { +func (h *Handle) send2(sc *sockClient, data []byte, logID string) error { + if r := sc.sock.SendTo(data, sc.peer); r != 0 { return fmt.Errorf("%s SendTo Failed: %d", logID, r) } return nil @@ -228,6 +255,8 @@ // SendOnly no recv func (h *Handle) SendOnly(key int, info *MsgInfo) error { + h.mtxWorker.Lock() + defer h.mtxWorker.Unlock() msg, err := proto.Marshal(info) if err == nil { if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { @@ -241,13 +270,16 @@ func (h *Handle) Pub(info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { - return h.send2(h.sockUp, msg, "Pub") + return h.send2(h.sockPub, msg, "Pub") } return err } // Request req sync func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo { + h.mtxWorker.Lock() + defer h.mtxWorker.Unlock() + msg, err := proto.Marshal(info) if err != nil { return nil -- Gitblit v1.8.0