From 38eccdf006c8374e89b7b7bb816b2d4ce4b2a220 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期二, 14 一月 2020 20:31:34 +0800
Subject: [PATCH] fix msg send
---
mangos.go | 158 ++++++++++++++++++++++++++--------------------------
1 files changed, 78 insertions(+), 80 deletions(-)
diff --git a/mangos.go b/mangos.go
index 50b56f5..47a9b0d 100644
--- a/mangos.go
+++ b/mangos.go
@@ -1,116 +1,114 @@
package pubsub
import (
- "context"
+ "basic.com/valib/gopherdiscovery.git"
"encoding/json"
"fmt"
- "nanomsg.org/go-mangos"
- "nanomsg.org/go-mangos/protocol/pub"
- "nanomsg.org/go-mangos/protocol/sub"
- "nanomsg.org/go-mangos/transport/ipc"
- "nanomsg.org/go-mangos/transport/tcp"
+ "time"
)
type mangosPubSub struct {
url string
-
- ctx context.Context
-
- sock mangos.Socket
-
- pubCh chan []byte //publish msg chan
+ heartBeatUrl string
+ pubCh chan Message //publish msg chan
+ aliveNodes gopherdiscovery.StringSet
+ clients map[string][]string
recvCh chan Message //recv msg chan
}
-func newPub(url string) (*mangosPubSub,error) {
- var sock mangos.Socket
+
+func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) {
+ var discoveryServer *gopherdiscovery.DiscoveryServer
var err error
+ var (
+ defaultOpts = gopherdiscovery.Options{
+ SurveyTime: 3 * time.Second,
+ //RecvDeadline: 3 * time.Second,
+ PollTime: 5 * time.Second,
+ }
+ )
- sock, err = pub.NewSocket()
- if err != nil {
- return nil, err
- }
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
+ discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
- err = sock.Listen(url)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
+ fmt.Println("err:",err)
pub := &mangosPubSub{
- url: url,
- ctx: ctx,
- sock: sock,
- pubCh: make(chan []byte),
+ url: publishUrl,
+ heartBeatUrl: heartBeatUrl,
+ aliveNodes: gopherdiscovery.NewStringSet(),
+ pubCh: make(chan Message, 50),
+ clients: make(map[string][]string),
}
+ var msgCache = make(map[string]Message)
+ //clientMsgCh := make(map[string]chan Message)
+ cacheNodes := gopherdiscovery.NewStringSet()
go func() {
for {
select {
- case <-ctx.Done():
- close(pub.pubCh)
- cancel()
- return
case msg := <-pub.pubCh:
- err := pub.sock.Send(msg)
- if err != nil {
- fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
+ msgCache[msg.Topic] = msg
+ //if len(clientMsgCh) > 0 {
+ // for _, ch := range clientMsgCh {
+ // ch <- msg
+ // }
+ //}
+ if cacheNodes.Cardinality() >0 {
+ sendB, _ := json.Marshal(msg)
+ discoveryServer.PublishMsg(string(sendB))
+ }
+ default:
+ nodeIds := discoveryServer.AliveNodes()
+
+ //for _,nodeId := range nodeIds {
+ // if _,ok := clientMsgCh[nodeId]; !ok {
+ // clientMsgCh[nodeId] = make(chan Message)
+ // }
+ //}
+ removedNodes := cacheNodes.Difference(nodeIds)
+ addedNodes := nodeIds.Difference(cacheNodes)
+ if len(nodeIds.ToSlice()) >0 {
+ if removedNodes.Cardinality() >0 || addedNodes.Cardinality() >0 { //鑺傜偣鏈夊彉鍖栫殑鏃跺�欙紝涔熼渶瑕佸彂娑堟伅
+ fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
+ if len(msgCache) > 0 {
+ for _,cMsg := range msgCache {
+ sendB, _ := json.Marshal(cMsg)
+ discoveryServer.PublishMsg(string(sendB))
+ }
+ }
+ }
+ cacheNodes = nodeIds
+ } else {//璁㈤槄鑰呭叏閮ㄩ樀浜�
+ cacheNodes = nodeIds
+ time.Sleep(10 * time.Millisecond)
}
}
}
}()
+
return pub,nil
}
-func newSub(url string, topics []string) (*mangosPubSub,error) {
- var sock mangos.Socket
- var err error
-
- sock, err = sub.NewSocket()
- if err != nil {
- return nil, err
+func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
+ client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
+ if err !=nil {
+ return nil,err
}
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
-
- err = sock.Dial(url)
- if err != nil {
- return nil, err
- }
- // subscribes to everything
- err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
+ heartMsg := client.HeartBeatMsg()
+ _= <-heartMsg
+ fmt.Println("heat beat with server success")
sub := &mangosPubSub{
- url:url,
- ctx: ctx,
- sock: sock,
+ url:subcribeUrl,
+ heartBeatUrl: heartBeatUrl,
recvCh: make(chan Message,50),
}
-
- var msg []byte
go func() {
- for {
- select {
- case <-ctx.Done():
- close(sub.recvCh)
- cancel()
- return
- default:
- msg, err = sub.sock.Recv()
- if err != nil {
- fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
- } else {
- //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
- var recvMsg Message
- if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
- if matchTopic(recvMsg.Topic, topics) {
- sub.recvCh <- recvMsg
- }
- }
+ peers, _ := client.Peers()
+ for msg := range peers {
+ //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
+ var recvMsg Message
+ if err := json.Unmarshal(msg, &recvMsg);err ==nil {
+ if matchTopic(recvMsg.Topic, topics) {
+ sub.recvCh <- recvMsg
}
}
}
@@ -131,7 +129,7 @@
return false
}
-func (ps *mangosPubSub) Publish(msg []byte) {
+func (ps *mangosPubSub) Publish(msg Message) {
ps.pubCh <- msg
}
--
Gitblit v1.8.0