From 5860c5fd3e6ae9f5412c12cbf16086e8585f3ef1 Mon Sep 17 00:00:00 2001
From: 龙赣华 <slongertian@gmail.com>
Date: 星期一, 03 六月 2019 09:34:53 +0800
Subject: [PATCH] add heartbeat channels

---
 client.go |   69 ++++++++++++++++++----------------
 1 files changed, 37 insertions(+), 32 deletions(-)

diff --git a/client.go b/client.go
index b70d4e2..2733153 100644
--- a/client.go
+++ b/client.go
@@ -3,7 +3,6 @@
 import (
 	"errors"
 	"log"
-	"strings"
 
 	"nanomsg.org/go-mangos"
 	"nanomsg.org/go-mangos/protocol/respondent"
@@ -27,6 +26,8 @@
 	// http://192.168.1.1:8080
 	service string
 
+	heartbeatmsg chan []byte
+
 	ctx    context.Context
 	cancel context.CancelFunc
 	sock   mangos.Socket
@@ -41,7 +42,7 @@
 	ctx  context.Context
 	sock mangos.Socket
 
-	changes chan []string
+	changes chan []byte
 }
 
 func Client(urlServer string, service string) (*DiscoveryClient, error) {
@@ -76,20 +77,25 @@
 	}
 
 	client := &DiscoveryClient{
-		urlServer:  urlServer,
-		urlPubSub:  urlPubSub,
-		service:    service,
-		ctx:        ctx,
-		cancel:     cancel,
-		sock:       sock,
-		subscriber: subscriber,
+		urlServer:    urlServer,
+		urlPubSub:    urlPubSub,
+		service:      service,
+		ctx:          ctx,
+		cancel:       cancel,
+		sock:         sock,
+		heartbeatmsg: make(chan []byte),
+		subscriber:   subscriber,
 	}
 
 	go client.run()
 	return client, nil
 }
 
-func (d *DiscoveryClient) Peers() (chan []string, error) {
+func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
+	return d.heartbeatmsg
+}
+
+func (d *DiscoveryClient) Peers() (chan []byte, error) {
 	if d.subscriber == nil {
 		return nil, errors.New("No subscribe url is provided to discover the Peers")
 	}
@@ -102,21 +108,27 @@
 
 func (d *DiscoveryClient) run() {
 	var err error
+	var msg []byte
 	for {
-		_, err = d.sock.Recv()
-
-		if err != nil {
-			log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
-		} else {
+		select {
+		case <-d.ctx.Done():
+			close(d.heartbeatmsg)
+			return
+		default:
+			msg, err = d.sock.Recv()
+			if err != nil {
+				log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
+				continue
+			}
+			err = d.sock.Send([]byte(d.service))
+			if err != nil {
+				log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
+				continue
+			}
 			select {
-			case <-d.ctx.Done():
-				return
-
+			case d.heartbeatmsg <- msg:
+				log.Println("recv heartbeat msg. ", msg)
 			default:
-				err = d.sock.Send([]byte(d.service))
-				if err != nil {
-					log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
-				}
 			}
 		}
 	}
@@ -147,14 +159,14 @@
 		url:     url,
 		ctx:     ctx,
 		sock:    sock,
-		changes: make(chan []string, 8),
+		changes: make(chan []byte, 8),
 	}
 
 	go subscriber.run()
 	return subscriber, nil
 }
 
-func (s *Subscriber) Changes() chan []string {
+func (s *Subscriber) Changes() chan []byte {
 	return s.changes
 }
 
@@ -171,15 +183,8 @@
 			msg, err = s.sock.Recv()
 			if err != nil {
 				log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
-
 			}
-
-			// non-blocking send to the channel, discards changes if the channel is not ready
-			select {
-			case s.changes <- strings.Split(string(msg), "|"):
-			default:
-			}
-
+			s.changes <- msg
 		}
 	}
 }

--
Gitblit v1.8.0