From 445d8e4320e6785346e1f4ea6c789c084c8a5e90 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期三, 15 一月 2020 09:59:15 +0800
Subject: [PATCH] msg consume once
---
mangos.go | 61 ++++++++++++++++++++----------
1 files changed, 40 insertions(+), 21 deletions(-)
diff --git a/mangos.go b/mangos.go
index a194c2e..0272673 100644
--- a/mangos.go
+++ b/mangos.go
@@ -8,10 +8,10 @@
)
type mangosPubSub struct {
- url string
+ url string
heartBeatUrl string
- pubCh chan Message //publish msg chan
- aliveNodes gopherdiscovery.StringSet
+ pubCh chan Message //publish msg chan
+ surveyors gopherdiscovery.StringSet
recvCh chan Message //recv msg chan
}
@@ -30,16 +30,15 @@
discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
- fmt.Println("err:",err)
+ fmt.Println("newPub err:",err)
pub := &mangosPubSub{
- url: publishUrl,
+ url: publishUrl,
heartBeatUrl: heartBeatUrl,
- aliveNodes: gopherdiscovery.NewStringSet(),
- pubCh: make(chan Message, 50),
+ surveyors: gopherdiscovery.NewStringSet(),
+ pubCh: make(chan Message, 50),
}
var msgCache = make(map[string]Message)
//clientMsgCh := make(map[string]chan Message)
- cacheNodes := gopherdiscovery.NewStringSet()
go func() {
for {
select {
@@ -50,7 +49,7 @@
// ch <- msg
// }
//}
- if cacheNodes.Cardinality() >0 {
+ if pub.surveyors.Cardinality() >0 {
sendB, _ := json.Marshal(msg)
discoveryServer.PublishMsg(string(sendB))
}
@@ -62,9 +61,11 @@
// clientMsgCh[nodeId] = make(chan Message)
// }
//}
+ removedNodes := pub.surveyors.Difference(nodeIds)
+ addedNodes := nodeIds.Difference(pub.surveyors)
if len(nodeIds.ToSlice()) >0 {
- if len(nodeIds.Difference(cacheNodes).ToSlice()) > 0 { //鑺傜偣鏈夊彉鍖栫殑鏃跺�欙紝涔熼渶瑕佸彂娑堟伅
- fmt.Println("aliveNodes:",nodeIds)
+ if addedNodes.Cardinality() >0 { //鏈夋柊鑺傜偣涓婄嚎鐨勬椂鍊欙紝闇�瑕佸彂涓�娆℃秷鎭紝鑺傜偣绂荤嚎鐨勬椂鍊欎笉鐢ㄧ
+ fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
if len(msgCache) > 0 {
for _,cMsg := range msgCache {
sendB, _ := json.Marshal(cMsg)
@@ -72,9 +73,9 @@
}
}
}
- cacheNodes = discoveryServer.AliveNodes()
+ pub.surveyors = nodeIds
} else {//璁㈤槄鑰呭叏閮ㄩ樀浜�
- cacheNodes = discoveryServer.AliveNodes()
+ pub.surveyors = nodeIds
time.Sleep(10 * time.Millisecond)
}
}
@@ -83,6 +84,10 @@
return pub,nil
}
+
+const (
+ msgTopicAll = "sub-msg-all-topic"
+)
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
@@ -96,15 +101,25 @@
url:subcribeUrl,
heartBeatUrl: heartBeatUrl,
recvCh: make(chan Message,50),
+ surveyors: gopherdiscovery.NewStringSet(),
}
+ var receivedCache = make(map[string]string)
go func() {
peers, _ := client.Peers()
for msg := range peers {
//鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
var recvMsg Message
if err := json.Unmarshal(msg, &recvMsg);err ==nil {
- if matchTopic(recvMsg.Topic, topics) {
- sub.recvCh <- recvMsg
+ if b,matchedTopic := matchTopic(&recvMsg, topics);b {
+ if lastMsgId,ok := receivedCache[matchedTopic];ok {
+ if lastMsgId != recvMsg.Id {
+ receivedCache[matchedTopic] = recvMsg.Id
+ sub.recvCh <- recvMsg
+ }
+ } else {
+ receivedCache[matchedTopic] = recvMsg.Id
+ sub.recvCh <- recvMsg
+ }
}
}
}
@@ -113,16 +128,20 @@
return sub,nil
}
-func matchTopic(topic string,subTopics []string) bool {
- if subTopics ==nil && len(subTopics) ==0 {
- return true
+func matchTopic(msg *Message,subTopics []string) (bool,string) {
+ if subTopics ==nil || len(subTopics) ==0 {
+ return true,msgTopicAll
}
for _,t := range subTopics {
- if topic == t {
- return true
+ if msg.Topic == t {
+ return true,msg.Topic
}
}
- return false
+ return false,""
+}
+
+func (ps *mangosPubSub) Surveyor() []string {
+ return ps.surveyors.ToSlice()
}
func (ps *mangosPubSub) Publish(msg Message) {
--
Gitblit v1.8.0