From dba48754d9623a49b155e94c65341b773bf4eeef Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 31 七月 2020 09:37:27 +0800
Subject: [PATCH] add get topic key

---
 library.go |  112 ++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 72 insertions(+), 40 deletions(-)

diff --git a/library.go b/library.go
index 242ac00..d81f0ce 100644
--- a/library.go
+++ b/library.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/golang/protobuf/proto"
@@ -17,12 +18,12 @@
 	GetTopicInfoTypeChannel = "getchannel"
 )
 
-type subOReply struct {
+type sockServer struct {
 	sock *DgramSocket
 	info *ProcInfo
 }
 
-type sockRe struct {
+type sockClient struct {
 	sock *DgramSocket
 	peer int
 }
@@ -34,21 +35,31 @@
 }
 
 // Handle handler
+/*
+sockHB/sockPub/sockWorker鍙互浣跨敤涓�涓猻ocket
+浣嗘槸鐢变簬闇�瑕佹敮鎸佸绾跨▼涓斿績璺�/鍙戝竷閮芥槸寰堥噸瑕佺殑淇℃伅,鍗曠嫭涓�涓猻ocket澶勭悊
+worker澶勭悊鐭椂鐨勫彂閫�
+*/
 type Handle struct {
 	ctx context.Context
 	// 鍒涘缓channel瀵瑰簲鐨剅eply,绛夊緟璇诲彇鍏朵腑鐨勫唴瀹�,server
 	// 鍏朵腑蹇呴』鏈変竴涓綔涓篟equest鍑芥暟鐨剆erver
-	m map[string]*subOReply
-	// 鍒涘缓蹇冭烦杩炴帴,client,浠呭彂閫佸績璺充俊鎭�
-	sockHB *sockRe
-	// 鍒涘缓鏇存柊涓婚杩炴帴,client,浠呭彂閫佷富棰樻洿鏂颁俊鎭�
-	sockUp *sockRe
-	// 鍒涘缓璁㈤槄鐨剆ocket
-	sockSub *subOReply
+	m map[string]*sockServer
 	// 鍒涘缓reply鏈嶅姟Request鍑芥暟
-	sockRep *sockRe
+	sockRep *sockServer
+	// 鍒涘缓蹇冭烦杩炴帴,client,浠呭彂閫佸績璺充俊鎭�
+	// 蹇冭烦闇�瑕佷繚璇佸崟鐙殑socket鍙戦��,濡傛灉璺熷叾浠栧叡鐢╯ocket,濡傛灉闃诲灏辨棤娉曞彂閫�
+	sockHB *sockClient
+	// 鍒涘缓鏇存柊涓婚杩炴帴,client,浠呭彂閫佷富棰樻洿鏂颁俊鎭�
+	// 鍙戦�佹湰韬殑pub淇℃伅,寰堝彲鑳藉叾浠栬繘绋嬩緷璧�,闇�瑕佸崟鐙瑂ocket澶勭悊
+	sockPub *sockClient
+	// 鍒涘缓璁㈤槄鐨剆ocket
+	// 璁㈤槄鐨勪富棰樺彂閫佺殑娑堟伅
+	sockSub *sockClient
 	// 鍒涘缓涓�涓竾鑳絪ocket鍙戦�佺粰浠绘剰server
-	sockWorker *sockRe
+	sockWorker *sockClient
+	// 澶氱嚎绋�
+	mtxWorker sync.Mutex
 
 	chSub   chan TransInfo
 	chReply chan TransInfo
@@ -62,7 +73,7 @@
 		v.sock.Close()
 	}
 	h.sockHB.sock.Close()
-	h.sockUp.sock.Close()
+	h.sockPub.sock.Close()
 	h.sockSub.sock.Close()
 	h.sockRep.sock.Close()
 	h.sockWorker.sock.Close()
@@ -89,7 +100,7 @@
 
 // Register reg
 func Register(ctx context.Context, info *RegisterInfo) *Handle {
-	m := make(map[string]*subOReply)
+	m := make(map[string]*sockServer)
 
 	// 棣栧厛璇锋眰涓�鍫唊ey
 	sockReg := OpenDgramSocket()
@@ -125,12 +136,12 @@
 		return nil
 	}
 
-	// 鏀跺彂req/rep channel
+	// 鏀跺彂req/rep channel, server
 	for _, v := range info.Channel {
 		if k, ok := regReply.ChannelKey[v]; ok {
 			s := OpenDgramSocket()
 			s.Bind(int(k))
-			m[v] = &subOReply{
+			m[v] = &sockServer{
 				sock: s,
 				info: info.ProcInfo,
 			}
@@ -141,20 +152,29 @@
 	chSub := make(chan TransInfo, chSize)
 	chReply := make(chan TransInfo, chSize)
 
-	// heartbeat浣跨敤涓�涓猻ocket
+	// reply浣跨敤涓�涓�,鏈嶅姟Request, server
+	sockReply := OpenDgramSocket()
+	sockReply.Bind(int(regReply.ReplyKey))
+	// 鍚姩鎺ユ敹绾跨▼
+	go recvRoutine(ctx, sockReply, chSub)
+	repS := &sockServer{
+		sock: sockReply,
+		info: info.ProcInfo,
+	}
+
+	// heartbeat浣跨敤涓�涓猻ocket, client
 	sockHB := OpenDgramSocket()
-	hbr := &sockRe{
+	hbC := &sockClient{
 		sock: sockHB,
 		peer: int(regReply.HeartbeatKey),
 	}
-	// 鏇存柊涓婚浣跨敤涓�涓�
+	// 鍙戝竷涓婚浣跨敤涓�涓�, client
 	sockUp := OpenDgramSocket()
-	upr := &sockRe{
+	pubC := &sockClient{
 		sock: sockUp,
 		peer: int(regReply.UpdateTopicKey),
 	}
-
-	// sub浣跨敤涓�涓猻ocket
+	// sub浣跨敤涓�涓猻ocket, client
 	sockSub := OpenDgramSocket()
 	// sockSub.Bind(int(regReply.SubTopicKey))
 	// 璁㈤槄涓婚
@@ -163,34 +183,25 @@
 	}
 	// 鍚姩鎺ユ敹绾跨▼
 	go recvRoutine(ctx, sockSub, chSub)
-	sub := &subOReply{
+	subC := &sockClient{
 		sock: sockSub,
-		info: info.ProcInfo,
-	}
-	// reply浣跨敤涓�涓�,鏈嶅姟Request
-	sockReply := OpenDgramSocket()
-	sockReply.Bind(int(regReply.ReplyKey))
-	// 鍚姩鎺ユ敹绾跨▼
-	go recvRoutine(ctx, sockReply, chSub)
-	rer := &sockRe{
-		sock: sockReply,
 		peer: -1,
 	}
 
 	// 涓囪兘socket,浠呬綔涓哄鎴风浣跨敤
 	sockW := OpenDgramSocket()
-	swr := &sockRe{
+	uniC := &sockClient{
 		sock: sockW,
 		peer: -1,
 	}
 	handle := &Handle{
 		ctx:        ctx,
 		m:          m,
-		sockHB:     hbr,
-		sockUp:     upr,
-		sockSub:    sub,
-		sockRep:    rer,
-		sockWorker: swr,
+		sockHB:     hbC,
+		sockPub:    pubC,
+		sockSub:    subC,
+		sockRep:    repS,
+		sockWorker: uniC,
 		chSub:      chSub,
 		chReply:    chReply,
 	}
@@ -207,11 +218,27 @@
 	if v, ok := h.m[topic]; ok {
 		return v.sock.Port()
 	}
+	// 杩滅▼鑾峰彇
+	msg := &TopicInfo{
+		Topic:     topic,
+		TopicType: typ,
+	}
+	if data, err := proto.Marshal(msg); err == nil {
+		h.mtxWorker.Lock()
+		if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
+			h.mtxWorker.Unlock()
+			var rmsg *TopicInfoReply
+			if err := proto.Unmarshal(rdata, rmsg); err == nil {
+				return int(rmsg.Key)
+			}
+		}
+		h.mtxWorker.Unlock()
+	}
 	return -1
 }
 
-func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
-	if r := sr.sock.SendTo(data, sr.peer); r != 0 {
+func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
+	if r := sc.sock.SendTo(data, sc.peer); r != 0 {
 		return fmt.Errorf("%s SendTo Failed: %d", logID, r)
 	}
 	return nil
@@ -228,6 +255,8 @@
 
 // SendOnly no recv
 func (h *Handle) SendOnly(key int, info *MsgInfo) error {
+	h.mtxWorker.Lock()
+	defer h.mtxWorker.Unlock()
 	msg, err := proto.Marshal(info)
 	if err == nil {
 		if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
@@ -241,13 +270,16 @@
 func (h *Handle) Pub(info *MsgInfo) error {
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		return h.send2(h.sockUp, msg, "Pub")
+		return h.send2(h.sockPub, msg, "Pub")
 	}
 	return err
 }
 
 // Request req sync
 func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
+	h.mtxWorker.Lock()
+	defer h.mtxWorker.Unlock()
+
 	msg, err := proto.Marshal(info)
 	if err != nil {
 		return nil

--
Gitblit v1.8.0