From a4ea6380ed70468b1bbaca0328a65686960fcd52 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期一, 20 四月 2020 16:23:30 +0800
Subject: [PATCH] add svLock

---
 client.go |   95 +++++++++++++++++++++++++++++------------------
 1 files changed, 59 insertions(+), 36 deletions(-)

diff --git a/client.go b/client.go
index b70d4e2..d9a211b 100644
--- a/client.go
+++ b/client.go
@@ -1,9 +1,9 @@
 package gopherdiscovery
 
 import (
+	"encoding/json"
 	"errors"
 	"log"
-	"strings"
 
 	"nanomsg.org/go-mangos"
 	"nanomsg.org/go-mangos/protocol/respondent"
@@ -25,7 +25,9 @@
 
 	// Service that needs to be discovered, for example for a web server could be
 	// http://192.168.1.1:8080
-	service string
+	service ServiceInfo
+
+	heartbeatmsg chan []byte
 
 	ctx    context.Context
 	cancel context.CancelFunc
@@ -41,14 +43,19 @@
 	ctx  context.Context
 	sock mangos.Socket
 
-	changes chan []string
+	changes chan []byte
 }
 
-func Client(urlServer string, service string) (*DiscoveryClient, error) {
-	return ClientWithSub(urlServer, "", service)
+type ServiceInfo struct {
+	ServiceId string `json:"serviceId"`
+	Info []byte `json:"info"`
 }
 
-func ClientWithSub(urlServer string, urlPubSub string, service string) (*DiscoveryClient, error) {
+func Client(urlServer string, serviceId string) (*DiscoveryClient, error) {
+	return ClientWithSub(urlServer, "", serviceId)
+}
+
+func ClientWithSub(urlServer string, urlPubSub string, serviceId string) (*DiscoveryClient, error) {
 	var sock mangos.Socket
 	var err error
 	var subscriber *Subscriber
@@ -75,21 +82,29 @@
 		return nil, err
 	}
 
+	svInfo := ServiceInfo{
+		ServiceId: serviceId,
+	}
 	client := &DiscoveryClient{
-		urlServer:  urlServer,
-		urlPubSub:  urlPubSub,
-		service:    service,
-		ctx:        ctx,
-		cancel:     cancel,
-		sock:       sock,
-		subscriber: subscriber,
+		urlServer:    urlServer,
+		urlPubSub:    urlPubSub,
+		service:      svInfo,
+		ctx:          ctx,
+		cancel:       cancel,
+		sock:         sock,
+		heartbeatmsg: make(chan []byte),
+		subscriber:   subscriber,
 	}
 
 	go client.run()
 	return client, nil
 }
 
-func (d *DiscoveryClient) Peers() (chan []string, error) {
+func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
+	return d.heartbeatmsg
+}
+
+func (d *DiscoveryClient) Peers() (chan []byte, error) {
 	if d.subscriber == nil {
 		return nil, errors.New("No subscribe url is provided to discover the Peers")
 	}
@@ -100,23 +115,38 @@
 	d.cancel()
 }
 
+func (d *DiscoveryClient) SetResp(i []byte) {
+	d.service.Info = i
+}
+
 func (d *DiscoveryClient) run() {
 	var err error
+	var msg []byte
 	for {
-		_, err = d.sock.Recv()
-
-		if err != nil {
-			log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
-		} else {
+		select {
+		case <-d.ctx.Done():
+			close(d.heartbeatmsg)
+			return
+		default:
+			msg, err = d.sock.Recv()
+			if err != nil {
+				log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
+				continue
+			}
+			sendB, err := json.Marshal(d.service)
+			if err != nil {
+				log.Println("DiscoveryClient: marshal d.serviceInfo err", err.Error())
+				continue
+			}
+			err = d.sock.Send(sendB)
+			if err != nil {
+				log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
+				continue
+			}
 			select {
-			case <-d.ctx.Done():
-				return
-
+			case d.heartbeatmsg <- msg:
+				log.Println("recv heartbeat msg. ", msg)
 			default:
-				err = d.sock.Send([]byte(d.service))
-				if err != nil {
-					log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
-				}
 			}
 		}
 	}
@@ -147,14 +177,14 @@
 		url:     url,
 		ctx:     ctx,
 		sock:    sock,
-		changes: make(chan []string, 8),
+		changes: make(chan []byte, 8),
 	}
 
 	go subscriber.run()
 	return subscriber, nil
 }
 
-func (s *Subscriber) Changes() chan []string {
+func (s *Subscriber) Changes() chan []byte {
 	return s.changes
 }
 
@@ -171,15 +201,8 @@
 			msg, err = s.sock.Recv()
 			if err != nil {
 				log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
-
 			}
-
-			// non-blocking send to the channel, discards changes if the channel is not ready
-			select {
-			case s.changes <- strings.Split(string(msg), "|"):
-			default:
-			}
-
+			s.changes <- msg
 		}
 	}
 }

--
Gitblit v1.8.0