From 1baf50119d7d19b276b132f6837e86b396f186ef Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 30 七月 2020 11:09:26 +0800
Subject: [PATCH] update dgram socket

---
 library.go |  267 +++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 216 insertions(+), 51 deletions(-)

diff --git a/library.go b/library.go
index e8b107c..0f32d87 100644
--- a/library.go
+++ b/library.go
@@ -1,6 +1,7 @@
 package softbus
 
 import (
+	"context"
 	"fmt"
 	"time"
 
@@ -8,86 +9,193 @@
 )
 
 const (
-	// HeartbeatKey fixed key for hb to servermanager
-	HeartbeatKey = 11
 	// RegKey fixed key for hb to servermanager
 	RegKey = 12
-	// UpKey fixed key for update topic to servermanager
-	UpKey = 13
 	// GetTopicInfoTypeTopic topic
 	GetTopicInfoTypeTopic = "gettopic"
 	// GetTopicInfoTypeChannel channel
 	GetTopicInfoTypeChannel = "getchannel"
 )
 
-type shmKeyAndProcInfo struct {
+type subOReply struct {
 	sock *DgramSocket
 	info *ProcInfo
 }
 
+type sockRe struct {
+	sock *DgramSocket
+	peer int
+}
+
+// TransInfo 浼犺緭鐨勬暟鎹拰蹇呰鐨勮褰�
+type TransInfo struct {
+	info *MsgInfo
+	port int
+}
+
 // Handle handler
 type Handle struct {
-	m          map[string]*shmKeyAndProcInfo
-	sockWorker *DgramSocket
+	ctx context.Context
+	// 鍒涘缓channel瀵瑰簲鐨剅eply,绛夊緟璇诲彇鍏朵腑鐨勫唴瀹�,server
+	// 鍏朵腑蹇呴』鏈変竴涓綔涓篟equest鍑芥暟鐨剆erver
+	m map[string]*subOReply
+	// 鍒涘缓蹇冭烦杩炴帴,client,浠呭彂閫佸績璺充俊鎭�
+	sockHB *sockRe
+	// 鍒涘缓鏇存柊涓婚杩炴帴,client,浠呭彂閫佷富棰樻洿鏂颁俊鎭�
+	sockUp *sockRe
+	// 鍒涘缓璁㈤槄鐨剆ocket
+	sockSub *subOReply
+	// 鍒涘缓reply鏈嶅姟Request鍑芥暟
+	sockRep *sockRe
+	// 鍒涘缓涓�涓竾鑳絪ocket鍙戦�佺粰浠绘剰server
+	sockWorker *sockRe
+
+	chSub   chan TransInfo
+	chReply chan TransInfo
+}
+
+func garbageCollect(ctx context.Context, h *Handle) {
+
+	<-ctx.Done()
+
+	for _, v := range h.m {
+		v.sock.Close()
+	}
+	h.sockHB.sock.Close()
+	h.sockUp.sock.Close()
+	h.sockSub.sock.Close()
+	h.sockRep.sock.Close()
+	h.sockWorker.sock.Close()
+}
+
+func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if data, peer, err := sock.RecvFrom(); err == nil {
+				var info *MsgInfo
+				if err := proto.Unmarshal(data, info); err == nil {
+					ch <- TransInfo{
+						info: info,
+						port: peer,
+					}
+				}
+			}
+			time.Sleep(50 * time.Millisecond)
+		}
+	}
 }
 
 // Register reg
-func Register(info *RegisterInfo) *Handle {
-	m := make(map[string]*shmKeyAndProcInfo)
+func Register(ctx context.Context, info *RegisterInfo) *Handle {
+	m := make(map[string]*subOReply)
 
 	// 棣栧厛璇锋眰涓�鍫唊ey
 	sockReg := OpenDgramSocket()
 	if sockReg == nil {
 		return nil
 	}
+	defer sockReg.Close()
+
 	var msg, rdata []byte
 	var err error
+loop:
 	for {
-		if msg == nil {
-			if msg, err = proto.Marshal(info); err != nil {
-				time.Sleep(100 * time.Millisecond)
-				continue
+		select {
+		case <-ctx.Done():
+			return nil
+		default:
+
+			if msg == nil {
+				if msg, err = proto.Marshal(info); err != nil {
+					time.Sleep(100 * time.Millisecond)
+					continue
+				}
+			}
+			if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
+				break loop
+			}
+			time.Sleep(100 * time.Millisecond)
+		}
+	}
+
+	// 寰楀埌key,璧嬪��
+	var regReply RegisterInfoReply
+	if err := proto.Unmarshal(rdata, &regReply); err != nil {
+		return nil
+	}
+
+	// 鏀跺彂req/rep channel
+	for _, v := range info.Channel {
+		if k, ok := regReply.ChannelKey[v]; ok {
+			s := OpenDgramSocket()
+			s.Bind(int(k))
+			m[v] = &subOReply{
+				sock: s,
+				info: info.ProcInfo,
 			}
 		}
-		if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
-			break
-		}
-		time.Sleep(100 * time.Millisecond)
-	}
-	sockReg.Close()
-	// 寰楀埌key,璧嬪��
-	if rdata != nil {
-
-	}
-	// 鍙敹涓嶅彂
-	for _, v := range info.Channel {
-		s := OpenDgramSocket()
-		m[v] = &shmKeyAndProcInfo{
-			sock: s,
-			info: info.ProcInfo,
-		}
 	}
 
-	// pub/sub浣跨敤鍚屼竴涓猻ocket
-	pbs := OpenDgramSocket()
-	for _, v := range info.PubTopic {
-		m[v] = &shmKeyAndProcInfo{
-			sock: pbs,
-			info: info.ProcInfo,
-		}
+	chSize := 5
+	chSub := make(chan TransInfo, chSize)
+	chReply := make(chan TransInfo, chSize)
+
+	// heartbeat浣跨敤涓�涓猻ocket
+	sockHB := OpenDgramSocket()
+	hbr := &sockRe{
+		sock: sockHB,
+		peer: int(regReply.HeartbeatKey),
 	}
-	for _, v := range info.SubTopic {
-		m[v] = &shmKeyAndProcInfo{
-			sock: pbs,
-			info: info.ProcInfo,
-		}
+	// 鏇存柊涓婚浣跨敤涓�涓�
+	sockUp := OpenDgramSocket()
+	upr := &sockRe{
+		sock: sockUp,
+		peer: int(regReply.UpdateTopicKey),
 	}
 
-	s := OpenDgramSocket()
-	return &Handle{
+	// sub浣跨敤涓�涓猻ocket
+	sockSub := OpenDgramSocket()
+	sockSub.Bind(int(regReply.SubTopicKey))
+	// 鍚姩鎺ユ敹绾跨▼
+	go recvRoutine(ctx, sockSub, chSub)
+	sub := &subOReply{
+		sock: sockSub,
+		info: info.ProcInfo,
+	}
+	// reply浣跨敤涓�涓�,鏈嶅姟Request
+	sockReply := OpenDgramSocket()
+	sockReply.Bind(int(regReply.ReplyKey))
+	// 鍚姩鎺ユ敹绾跨▼
+	go recvRoutine(ctx, sockReply, chSub)
+	rer := &sockRe{
+		sock: sockReply,
+		peer: -1,
+	}
+
+	// 涓囪兘socket,浠呬綔涓哄鎴风浣跨敤
+	sockW := OpenDgramSocket()
+	swr := &sockRe{
+		sock: sockW,
+		peer: -1,
+	}
+	handle := &Handle{
+		ctx:        ctx,
 		m:          m,
-		sockWorker: s,
+		sockHB:     hbr,
+		sockUp:     upr,
+		sockSub:    sub,
+		sockRep:    rer,
+		sockWorker: swr,
+		chSub:      chSub,
+		chReply:    chReply,
 	}
+
+	go garbageCollect(ctx, handle)
+
+	return handle
 }
 
 // GetTopicInfo get topic info
@@ -98,8 +206,8 @@
 	return -1
 }
 
-func (h *Handle) send2(data []byte, key int, logID string) error {
-	if r := h.sockWorker.SendTo(data, key); r != 0 {
+func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
+	if r := sr.sock.SendTo(data, sr.peer); r != 0 {
 		return fmt.Errorf("%s SendTo Failed: %d", logID, r)
 	}
 	return nil
@@ -109,7 +217,7 @@
 func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		return h.send2(msg, HeartbeatKey, "HeartBeat")
+		return h.send2(h.sockHB, msg, "HeartBeat")
 	}
 	return err
 }
@@ -118,13 +226,70 @@
 func (h *Handle) SendOnly(key int, info *MsgInfo) error {
 	msg, err := proto.Marshal(info)
 	if err == nil {
-		return h.send2(msg, key, "SendOnly/Pub")
+		if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
+			return fmt.Errorf("SendOnly Failed: %d", r)
+		}
 	}
 	return err
 }
 
 // Pub func
 func (h *Handle) Pub(info *MsgInfo) error {
-	// return h.SendOnly(PubKey, info)
-	return nil
+	msg, err := proto.Marshal(info)
+	if err == nil {
+		return h.send2(h.sockUp, msg, "Pub")
+	}
+	return err
+}
+
+// Request req sync
+func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
+	msg, err := proto.Marshal(info)
+	if err != nil {
+		return nil
+	}
+
+	// 鍚屾鎺ュ彛,闇�瑕佺瓑寰呰繑鍥炲��
+	var ret *MsgInfo
+loop:
+	for {
+		select {
+		case <-h.ctx.Done():
+			return nil
+		default:
+			if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
+				if err := proto.Unmarshal(data, ret); err == nil {
+					break loop
+				}
+			}
+			time.Sleep(100 * time.Millisecond)
+		}
+	}
+	return ret
+}
+
+// Reply request
+func (h *Handle) Reply(key int, info *MsgInfo) error {
+	msg, err := proto.Marshal(info)
+	if err == nil {
+		if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
+			return fmt.Errorf("Reply Failed: %d", r)
+		}
+	}
+	return err
+}
+
+// GetMesg get mesg for sub or reply
+func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
+	if len(h.chSub) > 1 {
+		m := <-h.chSub
+		subMsg = m.info
+	}
+
+	if len(h.chReply) > 1 {
+		m := <-h.chReply
+		replyMsg = m.info
+		replyKey = m.port
+	}
+	return
 }

--
Gitblit v1.8.0