From fb64156e926cfd98d0b4891543bdb47151272486 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期一, 13 一月 2020 16:35:24 +0800
Subject: [PATCH] fix

---
 mangos.go |  154 ++++++++++++++++++++++++--------------------------
 1 files changed, 74 insertions(+), 80 deletions(-)

diff --git a/mangos.go b/mangos.go
index 50b56f5..169c2c4 100644
--- a/mangos.go
+++ b/mangos.go
@@ -1,116 +1,110 @@
 package pubsub
 
 import (
-	"context"
+	"basic.com/valib/gopherdiscovery.git"
 	"encoding/json"
 	"fmt"
-	"nanomsg.org/go-mangos"
-	"nanomsg.org/go-mangos/protocol/pub"
-	"nanomsg.org/go-mangos/protocol/sub"
-	"nanomsg.org/go-mangos/transport/ipc"
-	"nanomsg.org/go-mangos/transport/tcp"
+	"time"
 )
 
 type mangosPubSub struct {
 	url string
-
-	ctx  context.Context
-
-	sock mangos.Socket
-
-	pubCh chan []byte  //publish msg chan
+	heartBeatUrl string
+	pubCh chan Message  //publish msg chan
+	aliveNodes gopherdiscovery.StringSet
 
 	recvCh chan Message  //recv msg chan
 }
 
-func newPub(url string) (*mangosPubSub,error) {
-	var sock mangos.Socket
+
+func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) {
+	var discoveryServer *gopherdiscovery.DiscoveryServer
 	var err error
+	var (
+		defaultOpts = gopherdiscovery.Options{
+			SurveyTime:   3 * time.Second,
+			//RecvDeadline: 3 * time.Second,
+			PollTime:     5 * time.Second,
+		}
+	)
 
-	sock, err = pub.NewSocket()
-	if err != nil {
-		return nil, err
-	}
-	sock.AddTransport(ipc.NewTransport())
-	sock.AddTransport(tcp.NewTransport())
+	discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
 
-	err = sock.Listen(url)
-	if err != nil {
-		return nil, err
-	}
-	ctx, cancel := context.WithCancel(context.Background())
+	fmt.Println("err:",err)
 	pub := &mangosPubSub{
-		url: url,
-		ctx: ctx,
-		sock: sock,
-		pubCh: make(chan []byte),
+		url: publishUrl,
+		heartBeatUrl: heartBeatUrl,
+		aliveNodes: gopherdiscovery.NewStringSet(),
+		pubCh: make(chan Message, 50),
 	}
+	var msgCache = make(map[string]Message)
+	//clientMsgCh := make(map[string]chan Message)
+	cacheNodes := gopherdiscovery.NewStringSet()
 	go func() {
 		for {
 			select {
-			case <-ctx.Done():
-				close(pub.pubCh)
-				cancel()
-				return
 			case msg := <-pub.pubCh:
-				err := pub.sock.Send(msg)
-				if err != nil {
-					fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
+				msgCache[msg.Topic] = msg
+				//if len(clientMsgCh) > 0 {
+				//	for _, ch := range clientMsgCh {
+				//		ch <- msg
+				//	}
+				//}
+				if cacheNodes.Cardinality() >0 {
+					sendB, _ := json.Marshal(msg)
+					discoveryServer.PublishMsg(string(sendB))
+				}
+			default:
+				nodeIds := discoveryServer.AliveNodes().ToSlice()
+				if len(nodeIds) >0 {
+					//for _,nodeId := range nodeIds {
+					//	if _,ok := clientMsgCh[nodeId]; !ok {
+					//		clientMsgCh[nodeId] = make(chan Message)
+					//	}
+					//}
+
+					if cacheNodes.Cardinality() == 0 { //绗竴娆℃湁涓婄嚎鐨勮妭鐐�
+						if len(msgCache) > 0 {
+							for _,cMsg := range msgCache {
+								sendB, _ := json.Marshal(cMsg)
+								discoveryServer.PublishMsg(string(sendB))
+							}
+						}
+					}
+					cacheNodes = discoveryServer.AliveNodes()
+				} else {
+					time.Sleep(10 * time.Millisecond)
 				}
 			}
 		}
 	}()
+	go func() {
+
+	}()
 	return pub,nil
 }
 
-func newSub(url string, topics []string) (*mangosPubSub,error) {
-	var sock mangos.Socket
-	var err error
-
-	sock, err = sub.NewSocket()
-	if err != nil {
-		return nil, err
+func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
+	client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
+	if err !=nil {
+		return nil,err
 	}
-	sock.AddTransport(ipc.NewTransport())
-	sock.AddTransport(tcp.NewTransport())
-
-	err = sock.Dial(url)
-	if err != nil {
-		return nil, err
-	}
-	// subscribes to everything
-	err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
-	if err != nil {
-		return nil, err
-	}
-	ctx, cancel := context.WithCancel(context.Background())
+	heartMsg := client.HeartBeatMsg()
+	_= <-heartMsg
+	fmt.Println("heat beat with server success")
 	sub := &mangosPubSub{
-		url:url,
-		ctx: ctx,
-		sock: sock,
+		url:subcribeUrl,
+		heartBeatUrl: heartBeatUrl,
 		recvCh: make(chan Message,50),
 	}
-
-	var msg []byte
 	go func() {
-		for {
-			select {
-			case <-ctx.Done():
-				close(sub.recvCh)
-				cancel()
-				return
-			default:
-				msg, err = sub.sock.Recv()
-				if err != nil {
-					fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
-				} else {
-					//鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
-					var recvMsg Message
-					if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
-						if matchTopic(recvMsg.Topic, topics) {
-							sub.recvCh <- recvMsg
-						}
-					}
+		peers, _ := client.Peers()
+		for msg := range peers {
+			//鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅
+			var recvMsg Message
+			if err := json.Unmarshal(msg, &recvMsg);err ==nil {
+				if matchTopic(recvMsg.Topic, topics) {
+					sub.recvCh <- recvMsg
 				}
 			}
 		}
@@ -131,7 +125,7 @@
 	return false
 }
 
-func (ps *mangosPubSub) Publish(msg []byte) {
+func (ps *mangosPubSub) Publish(msg Message) {
 	ps.pubCh <- msg
 }
 

--
Gitblit v1.8.0