From 445d8e4320e6785346e1f4ea6c789c084c8a5e90 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期三, 15 一月 2020 09:59:15 +0800
Subject: [PATCH] msg consume once
---
mangos.go | 179 ++++++++++++++++++++++++++++++++---------------------------
1 files changed, 96 insertions(+), 83 deletions(-)
diff --git a/mangos.go b/mangos.go
index 50b56f5..0272673 100644
--- a/mangos.go
+++ b/mangos.go
@@ -1,115 +1,124 @@
package pubsub
import (
- "context"
+ "basic.com/valib/gopherdiscovery.git"
"encoding/json"
"fmt"
- "nanomsg.org/go-mangos"
- "nanomsg.org/go-mangos/protocol/pub"
- "nanomsg.org/go-mangos/protocol/sub"
- "nanomsg.org/go-mangos/transport/ipc"
- "nanomsg.org/go-mangos/transport/tcp"
+ "time"
)
type mangosPubSub struct {
- url string
-
- ctx context.Context
-
- sock mangos.Socket
-
- pubCh chan []byte //publish msg chan
+ url string
+ heartBeatUrl string
+ pubCh chan Message //publish msg chan
+ surveyors gopherdiscovery.StringSet
recvCh chan Message //recv msg chan
}
-func newPub(url string) (*mangosPubSub,error) {
- var sock mangos.Socket
+
+func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) {
+ var discoveryServer *gopherdiscovery.DiscoveryServer
var err error
+ var (
+ defaultOpts = gopherdiscovery.Options{
+ SurveyTime: 3 * time.Second,
+ //RecvDeadline: 3 * time.Second,
+ PollTime: 5 * time.Second,
+ }
+ )
- sock, err = pub.NewSocket()
- if err != nil {
- return nil, err
- }
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
+ discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
- err = sock.Listen(url)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
+ fmt.Println("newPub err:",err)
pub := &mangosPubSub{
- url: url,
- ctx: ctx,
- sock: sock,
- pubCh: make(chan []byte),
+ url: publishUrl,
+ heartBeatUrl: heartBeatUrl,
+ surveyors: gopherdiscovery.NewStringSet(),
+ pubCh: make(chan Message, 50),
}
+ var msgCache = make(map[string]Message)
+ //clientMsgCh := make(map[string]chan Message)
go func() {
for {
select {
- case <-ctx.Done():
- close(pub.pubCh)
- cancel()
- return
case msg := <-pub.pubCh:
- err := pub.sock.Send(msg)
- if err != nil {
- fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
+ msgCache[msg.Topic] = msg
+ //if len(clientMsgCh) > 0 {
+ // for _, ch := range clientMsgCh {
+ // ch <- msg
+ // }
+ //}
+ if pub.surveyors.Cardinality() >0 {
+ sendB, _ := json.Marshal(msg)
+ discoveryServer.PublishMsg(string(sendB))
+ }
+ default:
+ nodeIds := discoveryServer.AliveNodes()
+
+ //for _,nodeId := range nodeIds {
+ // if _,ok := clientMsgCh[nodeId]; !ok {
+ // clientMsgCh[nodeId] = make(chan Message)
+ // }
+ //}
+ removedNodes := pub.surveyors.Difference(nodeIds)
+ addedNodes := nodeIds.Difference(pub.surveyors)
+ if len(nodeIds.ToSlice()) >0 {
+ if addedNodes.Cardinality() >0 { //鏈夋柊鑺傜偣涓婄嚎鐨勬椂鍊欙紝闇�瑕佸彂涓�娆℃秷鎭紝鑺傜偣绂荤嚎鐨勬椂鍊欎笉鐢ㄧ
+ fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
+ if len(msgCache) > 0 {
+ for _,cMsg := range msgCache {
+ sendB, _ := json.Marshal(cMsg)
+ discoveryServer.PublishMsg(string(sendB))
+ }
+ }
+ }
+ pub.surveyors = nodeIds
+ } else {//璁㈤槄鑰呭叏閮ㄩ樀浜�
+ pub.surveyors = nodeIds
+ time.Sleep(10 * time.Millisecond)
}
}
}
}()
+
return pub,nil
}
-func newSub(url string, topics []string) (*mangosPubSub,error) {
- var sock mangos.Socket
- var err error
+const (
+ msgTopicAll = "sub-msg-all-topic"
+)
- sock, err = sub.NewSocket()
- if err != nil {
- return nil, err
+func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
+ client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
+ if err !=nil {
+ return nil,err
}
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
-
- err = sock.Dial(url)
- if err != nil {
- return nil, err
- }
- // subscribes to everything
- err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
+ heartMsg := client.HeartBeatMsg()
+ _= <-heartMsg
+ fmt.Println("heat beat with server success")
sub := &mangosPubSub{
- url:url,
- ctx: ctx,
- sock: sock,
+ url:subcribeUrl,
+ heartBeatUrl: heartBeatUrl,
recvCh: make(chan Message,50),
+ surveyors: gopherdiscovery.NewStringSet(),
}
-
- var msg []byte
+ var receivedCache = make(map[string]string)
go func() {
- for {
- select {
- case <-ctx.Done():
- close(sub.recvCh)
- cancel()
- return
- default:
- msg, err = sub.sock.Recv()
- if err != nil {
- fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
- } else {
- //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
- var recvMsg Message
- if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
- if matchTopic(recvMsg.Topic, topics) {
+ peers, _ := client.Peers()
+ for msg := range peers {
+ //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
+ var recvMsg Message
+ if err := json.Unmarshal(msg, &recvMsg);err ==nil {
+ if b,matchedTopic := matchTopic(&recvMsg, topics);b {
+ if lastMsgId,ok := receivedCache[matchedTopic];ok {
+ if lastMsgId != recvMsg.Id {
+ receivedCache[matchedTopic] = recvMsg.Id
sub.recvCh <- recvMsg
}
+ } else {
+ receivedCache[matchedTopic] = recvMsg.Id
+ sub.recvCh <- recvMsg
}
}
}
@@ -119,19 +128,23 @@
return sub,nil
}
-func matchTopic(topic string,subTopics []string) bool {
- if subTopics ==nil && len(subTopics) ==0 {
- return true
+func matchTopic(msg *Message,subTopics []string) (bool,string) {
+ if subTopics ==nil || len(subTopics) ==0 {
+ return true,msgTopicAll
}
for _,t := range subTopics {
- if topic == t {
- return true
+ if msg.Topic == t {
+ return true,msg.Topic
}
}
- return false
+ return false,""
}
-func (ps *mangosPubSub) Publish(msg []byte) {
+func (ps *mangosPubSub) Surveyor() []string {
+ return ps.surveyors.ToSlice()
+}
+
+func (ps *mangosPubSub) Publish(msg Message) {
ps.pubCh <- msg
}
--
Gitblit v1.8.0