From 445d8e4320e6785346e1f4ea6c789c084c8a5e90 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期三, 15 一月 2020 09:59:15 +0800
Subject: [PATCH] msg consume once

---
 mangos.go |   63 +++++++++++++++++++------------
 1 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/mangos.go b/mangos.go
index 47a9b0d..0272673 100644
--- a/mangos.go
+++ b/mangos.go
@@ -8,11 +8,10 @@
 )
 
 type mangosPubSub struct {
-	url string
+	url          string
 	heartBeatUrl string
-	pubCh chan Message  //publish msg chan
-	aliveNodes gopherdiscovery.StringSet
-	clients map[string][]string
+	pubCh        chan Message  //publish msg chan
+	surveyors    gopherdiscovery.StringSet
 
 	recvCh chan Message  //recv msg chan
 }
@@ -31,17 +30,15 @@
 
 	discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
 
-	fmt.Println("err:",err)
+	fmt.Println("newPub err:",err)
 	pub := &mangosPubSub{
-		url: publishUrl,
+		url:          publishUrl,
 		heartBeatUrl: heartBeatUrl,
-		aliveNodes: gopherdiscovery.NewStringSet(),
-		pubCh: make(chan Message, 50),
-		clients: make(map[string][]string),
+		surveyors:    gopherdiscovery.NewStringSet(),
+		pubCh:        make(chan Message, 50),
 	}
 	var msgCache = make(map[string]Message)
 	//clientMsgCh := make(map[string]chan Message)
-	cacheNodes := gopherdiscovery.NewStringSet()
 	go func() {
 		for {
 			select {
@@ -52,7 +49,7 @@
 				//		ch <- msg
 				//	}
 				//}
-				if cacheNodes.Cardinality() >0 {
+				if pub.surveyors.Cardinality() >0 {
 					sendB, _ := json.Marshal(msg)
 					discoveryServer.PublishMsg(string(sendB))
 				}
@@ -64,10 +61,10 @@
 				//		clientMsgCh[nodeId] = make(chan Message)
 				//	}
 				//}
-				removedNodes := cacheNodes.Difference(nodeIds)
-				addedNodes := nodeIds.Difference(cacheNodes)
+				removedNodes := pub.surveyors.Difference(nodeIds)
+				addedNodes := nodeIds.Difference(pub.surveyors)
 				if len(nodeIds.ToSlice()) >0 {
-					if removedNodes.Cardinality() >0 || addedNodes.Cardinality() >0 { //鑺傜偣鏈夊彉鍖栫殑鏃跺�欙紝涔熼渶瑕佸彂娑堟伅
+					if addedNodes.Cardinality() >0 { //鏈夋柊鑺傜偣涓婄嚎鐨勬椂鍊欙紝闇�瑕佸彂涓�娆℃秷鎭紝鑺傜偣绂荤嚎鐨勬椂鍊欎笉鐢ㄧ
 						fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
 						if len(msgCache) > 0 {
 							for _,cMsg := range msgCache {
@@ -76,9 +73,9 @@
 							}
 						}
 					}
-					cacheNodes = nodeIds
+					pub.surveyors = nodeIds
 				} else {//璁㈤槄鑰呭叏閮ㄩ樀浜�
-					cacheNodes = nodeIds
+					pub.surveyors = nodeIds
 					time.Sleep(10 * time.Millisecond)
 				}
 			}
@@ -87,6 +84,10 @@
 
 	return pub,nil
 }
+
+const (
+	msgTopicAll = "sub-msg-all-topic"
+)
 
 func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
 	client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
@@ -100,15 +101,25 @@
 		url:subcribeUrl,
 		heartBeatUrl: heartBeatUrl,
 		recvCh: make(chan Message,50),
+		surveyors: gopherdiscovery.NewStringSet(),
 	}
+	var receivedCache = make(map[string]string)
 	go func() {
 		peers, _ := client.Peers()
 		for msg := range peers {
 			//鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
 			var recvMsg Message
 			if err := json.Unmarshal(msg, &recvMsg);err ==nil {
-				if matchTopic(recvMsg.Topic, topics) {
-					sub.recvCh <- recvMsg
+				if b,matchedTopic := matchTopic(&recvMsg, topics);b {
+					if lastMsgId,ok := receivedCache[matchedTopic];ok {
+						if lastMsgId != recvMsg.Id {
+							receivedCache[matchedTopic] = recvMsg.Id
+							sub.recvCh <- recvMsg
+						}
+					} else {
+						receivedCache[matchedTopic] = recvMsg.Id
+						sub.recvCh <- recvMsg
+					}
 				}
 			}
 		}
@@ -117,16 +128,20 @@
 	return sub,nil
 }
 
-func matchTopic(topic string,subTopics []string) bool {
-	if subTopics ==nil && len(subTopics) ==0 {
-		return true
+func matchTopic(msg *Message,subTopics []string) (bool,string) {
+	if subTopics ==nil || len(subTopics) ==0 {
+		return true,msgTopicAll
 	}
 	for _,t := range subTopics {
-		if topic == t {
-			return true
+		if msg.Topic == t {
+			return true,msg.Topic
 		}
 	}
-	return false
+	return false,""
+}
+
+func (ps *mangosPubSub) Surveyor() []string {
+	return ps.surveyors.ToSlice()
 }
 
 func (ps *mangosPubSub) Publish(msg Message) {

--
Gitblit v1.8.0