From 445d8e4320e6785346e1f4ea6c789c084c8a5e90 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期三, 15 一月 2020 09:59:15 +0800 Subject: [PATCH] msg consume once --- mangos.go | 29 +++++++++++++++++++++-------- pubsub.go | 1 + 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/mangos.go b/mangos.go index 5f6b441..0272673 100644 --- a/mangos.go +++ b/mangos.go @@ -85,6 +85,10 @@ return pub,nil } +const ( + msgTopicAll = "sub-msg-all-topic" +) + func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) { client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId) if err !=nil { @@ -99,14 +103,23 @@ recvCh: make(chan Message,50), surveyors: gopherdiscovery.NewStringSet(), } + var receivedCache = make(map[string]string) go func() { peers, _ := client.Peers() for msg := range peers { //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅 var recvMsg Message if err := json.Unmarshal(msg, &recvMsg);err ==nil { - if matchTopic(recvMsg.Topic, topics) { - sub.recvCh <- recvMsg + if b,matchedTopic := matchTopic(&recvMsg, topics);b { + if lastMsgId,ok := receivedCache[matchedTopic];ok { + if lastMsgId != recvMsg.Id { + receivedCache[matchedTopic] = recvMsg.Id + sub.recvCh <- recvMsg + } + } else { + receivedCache[matchedTopic] = recvMsg.Id + sub.recvCh <- recvMsg + } } } } @@ -115,16 +128,16 @@ return sub,nil } -func matchTopic(topic string,subTopics []string) bool { - if subTopics ==nil && len(subTopics) ==0 { - return true +func matchTopic(msg *Message,subTopics []string) (bool,string) { + if subTopics ==nil || len(subTopics) ==0 { + return true,msgTopicAll } for _,t := range subTopics { - if topic == t { - return true + if msg.Topic == t { + return true,msg.Topic } } - return false + return false,"" } func (ps *mangosPubSub) Surveyor() []string { diff --git a/pubsub.go b/pubsub.go index f073179..847d74a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -10,6 +10,7 @@ } type Message struct { + Id string Topic string Msg []byte } -- Gitblit v1.8.0