From 445d8e4320e6785346e1f4ea6c789c084c8a5e90 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期三, 15 一月 2020 09:59:15 +0800
Subject: [PATCH] msg consume once

---
 mangos.go |   29 +++++++++++++++++++++--------
 pubsub.go |    1 +
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/mangos.go b/mangos.go
index 5f6b441..0272673 100644
--- a/mangos.go
+++ b/mangos.go
@@ -85,6 +85,10 @@
 	return pub,nil
 }
 
+const (
+	msgTopicAll = "sub-msg-all-topic"
+)
+
 func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
 	client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
 	if err !=nil {
@@ -99,14 +103,23 @@
 		recvCh: make(chan Message,50),
 		surveyors: gopherdiscovery.NewStringSet(),
 	}
+	var receivedCache = make(map[string]string)
 	go func() {
 		peers, _ := client.Peers()
 		for msg := range peers {
 			//鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
 			var recvMsg Message
 			if err := json.Unmarshal(msg, &recvMsg);err ==nil {
-				if matchTopic(recvMsg.Topic, topics) {
-					sub.recvCh <- recvMsg
+				if b,matchedTopic := matchTopic(&recvMsg, topics);b {
+					if lastMsgId,ok := receivedCache[matchedTopic];ok {
+						if lastMsgId != recvMsg.Id {
+							receivedCache[matchedTopic] = recvMsg.Id
+							sub.recvCh <- recvMsg
+						}
+					} else {
+						receivedCache[matchedTopic] = recvMsg.Id
+						sub.recvCh <- recvMsg
+					}
 				}
 			}
 		}
@@ -115,16 +128,16 @@
 	return sub,nil
 }
 
-func matchTopic(topic string,subTopics []string) bool {
-	if subTopics ==nil && len(subTopics) ==0 {
-		return true
+func matchTopic(msg *Message,subTopics []string) (bool,string) {
+	if subTopics ==nil || len(subTopics) ==0 {
+		return true,msgTopicAll
 	}
 	for _,t := range subTopics {
-		if topic == t {
-			return true
+		if msg.Topic == t {
+			return true,msg.Topic
 		}
 	}
-	return false
+	return false,""
 }
 
 func (ps *mangosPubSub) Surveyor() []string {
diff --git a/pubsub.go b/pubsub.go
index f073179..847d74a 100644
--- a/pubsub.go
+++ b/pubsub.go
@@ -10,6 +10,7 @@
 }
 
 type Message struct {
+	Id string
 	Topic string
 	Msg []byte
 }

--
Gitblit v1.8.0