From a4ea6380ed70468b1bbaca0328a65686960fcd52 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期一, 20 四月 2020 16:23:30 +0800
Subject: [PATCH] add svLock

---
 server.go |   67 +++++++++++++++++++++------------
 1 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/server.go b/server.go
index 770732b..f23728c 100644
--- a/server.go
+++ b/server.go
@@ -1,12 +1,10 @@
 package gopherdiscovery
 
 import (
-	"fmt"
+	"encoding/json"
 	"log"
-	"strings"
+	"sync"
 	"time"
-
-	"basic.com/pubsub/protomsg.git"
 
 	"golang.org/x/net/context"
 
@@ -22,7 +20,7 @@
 	// responses
 	SurveyTime time.Duration
 	// RecvDeadline is the time until the next recived of the SURVEY times out.
-	RecvDeadline time.Duration
+	//RecvDeadline time.Duration
 	// PollTime is minimal time between SURVEYS (The time between SURVEYS could be greater than this time
 	// if the SURVEY process takes longer than that time)
 	PollTime time.Duration
@@ -53,6 +51,9 @@
 	nodes StringSet
 	// publisher, we are going to publish the changes of the set here
 	publisher *Publisher
+
+	svInfo map[string][]byte
+	svLock sync.RWMutex
 }
 
 type Publisher struct {
@@ -62,7 +63,7 @@
 	ctx  context.Context
 	sock mangos.Socket
 
-	publishCh chan []string
+	publishCh chan []byte
 }
 
 func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) {
@@ -88,10 +89,10 @@
 	if err != nil {
 		return nil, err
 	}
-	err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline)
-	if err != nil {
-		return nil, err
-	}
+	//err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline)
+	//if err != nil {
+	//	return nil, err
+	//}
 
 	pubCtx, pubCancel := context.WithCancel(ctx)
 	publisher, err = NewPublisher(pubCtx, urlPubSub)
@@ -142,6 +143,7 @@
 	var err error
 	var msg []byte
 	var responses StringSet
+	var si ServiceInfo
 
 	err = d.sock.Send([]byte("are you ok?"))
 	if err != nil {
@@ -150,24 +152,24 @@
 	}
 
 	responses = NewStringSet()
+	d.services.svInfo = make(map[string][]byte, 0)
 	for {
 		msg, err = d.sock.Recv()
 		if err != nil {
-			if err == mangos.ErrRecvTimeout {//鍦ㄨ秴鏃舵椂闂村唴鏀跺埌鐨勫搷搴�
+			if err == mangos.ErrProtoState {
 				// Timeout means I can add the current responses to the SET
-				fmt.Println("mangos.ErrRecvTimeout")
 				d.services.Add(responses)
 				return
 			}
-			//fmt.Println(" err: ",err.Error())
-			//log.Println("DiscoveryServer: Error reading SURVEY responses", err.Error())
 		} else {
-			fmt.Printf("poll received msg:%s \n",string(msg))
-			responses.Add(string(msg))
-			//break
+			if  json.Unmarshal(msg, &si) == nil {
+				responses.Add(si.ServiceId)
+				d.services.svLock.Lock()
+				d.services.svInfo[si.ServiceId] = si.Info
+				d.services.svLock.Unlock()
+			}
 		}
 	}
-	fmt.Println("for out")
 }
 
 func NewPublisher(ctx context.Context, url string) (*Publisher, error) {
@@ -191,14 +193,14 @@
 		url:  url,
 		sock: sock,
 
-		publishCh: make(chan []string),
+		publishCh: make(chan []byte),
 	}
 
 	go publiser.run()
 	return publiser, nil
 }
 
-func (p *Publisher) Publish(msg []string) {
+func (p *Publisher) Publish(msg []byte) {
 	p.publishCh <- msg
 }
 
@@ -209,7 +211,7 @@
 			close(p.publishCh)
 			return
 		case msg := <-p.publishCh:
-			err := p.sock.Send([]byte(strings.Join(msg, "|")))
+			err := p.sock.Send(msg)
 			if err != nil {
 				log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error())
 			}
@@ -221,6 +223,7 @@
 	s := &Services{
 		nodes:     NewStringSet(),
 		publisher: publisher,
+		svInfo: make(map[string][]byte, 0),
 	}
 
 	return s
@@ -237,9 +240,23 @@
 
 	s.nodes = responses
 	// publish the changes
-	s.publisher.Publish(s.nodes.ToSlice())
+	//s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed
 }
 
-func (d *DiscoveryServer) PublishMsg(msg protomsg.PublishMessage){
-	d.services.publisher.Publish([]string{string(msg)})
-}
\ No newline at end of file
+func (d *DiscoveryServer) AliveNodes() StringSet {
+	return d.services.nodes
+}
+
+func (d *DiscoveryServer) SvInfo() map[string][]byte {
+	d.services.svLock.Lock()
+	defer d.services.svLock.Unlock()
+	m := make(map[string][]byte)
+	for k,v := range d.services.svInfo {
+		m[k] = v
+	}
+	return m
+}
+
+func (d *DiscoveryServer) PublishMsg(msg string) {
+	d.services.publisher.Publish([]byte(msg))
+}

--
Gitblit v1.8.0