From 445d8e4320e6785346e1f4ea6c789c084c8a5e90 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期三, 15 一月 2020 09:59:15 +0800
Subject: [PATCH] msg consume once
---
mangos.go | 29 +++++++++++++++++++++--------
pubsub.go | 1 +
2 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/mangos.go b/mangos.go
index 5f6b441..0272673 100644
--- a/mangos.go
+++ b/mangos.go
@@ -85,6 +85,10 @@
return pub,nil
}
+const (
+ msgTopicAll = "sub-msg-all-topic"
+)
+
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
if err !=nil {
@@ -99,14 +103,23 @@
recvCh: make(chan Message,50),
surveyors: gopherdiscovery.NewStringSet(),
}
+ var receivedCache = make(map[string]string)
go func() {
peers, _ := client.Peers()
for msg := range peers {
//鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
var recvMsg Message
if err := json.Unmarshal(msg, &recvMsg);err ==nil {
- if matchTopic(recvMsg.Topic, topics) {
- sub.recvCh <- recvMsg
+ if b,matchedTopic := matchTopic(&recvMsg, topics);b {
+ if lastMsgId,ok := receivedCache[matchedTopic];ok {
+ if lastMsgId != recvMsg.Id {
+ receivedCache[matchedTopic] = recvMsg.Id
+ sub.recvCh <- recvMsg
+ }
+ } else {
+ receivedCache[matchedTopic] = recvMsg.Id
+ sub.recvCh <- recvMsg
+ }
}
}
}
@@ -115,16 +128,16 @@
return sub,nil
}
-func matchTopic(topic string,subTopics []string) bool {
- if subTopics ==nil && len(subTopics) ==0 {
- return true
+func matchTopic(msg *Message,subTopics []string) (bool,string) {
+ if subTopics ==nil || len(subTopics) ==0 {
+ return true,msgTopicAll
}
for _,t := range subTopics {
- if topic == t {
- return true
+ if msg.Topic == t {
+ return true,msg.Topic
}
}
- return false
+ return false,""
}
func (ps *mangosPubSub) Surveyor() []string {
diff --git a/pubsub.go b/pubsub.go
index f073179..847d74a 100644
--- a/pubsub.go
+++ b/pubsub.go
@@ -10,6 +10,7 @@
}
type Message struct {
+ Id string
Topic string
Msg []byte
}
--
Gitblit v1.8.0