From 5860c5fd3e6ae9f5412c12cbf16086e8585f3ef1 Mon Sep 17 00:00:00 2001
From: 龙赣华 <slongertian@gmail.com>
Date: 星期一, 03 六月 2019 09:34:53 +0800
Subject: [PATCH] add heartbeat channels
---
client.go | 69 ++++++++++++++++++----------------
1 files changed, 37 insertions(+), 32 deletions(-)
diff --git a/client.go b/client.go
index b70d4e2..2733153 100644
--- a/client.go
+++ b/client.go
@@ -3,7 +3,6 @@
import (
"errors"
"log"
- "strings"
"nanomsg.org/go-mangos"
"nanomsg.org/go-mangos/protocol/respondent"
@@ -27,6 +26,8 @@
// http://192.168.1.1:8080
service string
+ heartbeatmsg chan []byte
+
ctx context.Context
cancel context.CancelFunc
sock mangos.Socket
@@ -41,7 +42,7 @@
ctx context.Context
sock mangos.Socket
- changes chan []string
+ changes chan []byte
}
func Client(urlServer string, service string) (*DiscoveryClient, error) {
@@ -76,20 +77,25 @@
}
client := &DiscoveryClient{
- urlServer: urlServer,
- urlPubSub: urlPubSub,
- service: service,
- ctx: ctx,
- cancel: cancel,
- sock: sock,
- subscriber: subscriber,
+ urlServer: urlServer,
+ urlPubSub: urlPubSub,
+ service: service,
+ ctx: ctx,
+ cancel: cancel,
+ sock: sock,
+ heartbeatmsg: make(chan []byte),
+ subscriber: subscriber,
}
go client.run()
return client, nil
}
-func (d *DiscoveryClient) Peers() (chan []string, error) {
+func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
+ return d.heartbeatmsg
+}
+
+func (d *DiscoveryClient) Peers() (chan []byte, error) {
if d.subscriber == nil {
return nil, errors.New("No subscribe url is provided to discover the Peers")
}
@@ -102,21 +108,27 @@
func (d *DiscoveryClient) run() {
var err error
+ var msg []byte
for {
- _, err = d.sock.Recv()
-
- if err != nil {
- log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
- } else {
+ select {
+ case <-d.ctx.Done():
+ close(d.heartbeatmsg)
+ return
+ default:
+ msg, err = d.sock.Recv()
+ if err != nil {
+ log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
+ continue
+ }
+ err = d.sock.Send([]byte(d.service))
+ if err != nil {
+ log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
+ continue
+ }
select {
- case <-d.ctx.Done():
- return
-
+ case d.heartbeatmsg <- msg:
+ log.Println("recv heartbeat msg. ", msg)
default:
- err = d.sock.Send([]byte(d.service))
- if err != nil {
- log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
- }
}
}
}
@@ -147,14 +159,14 @@
url: url,
ctx: ctx,
sock: sock,
- changes: make(chan []string, 8),
+ changes: make(chan []byte, 8),
}
go subscriber.run()
return subscriber, nil
}
-func (s *Subscriber) Changes() chan []string {
+func (s *Subscriber) Changes() chan []byte {
return s.changes
}
@@ -171,15 +183,8 @@
msg, err = s.sock.Recv()
if err != nil {
log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
-
}
-
- // non-blocking send to the channel, discards changes if the channel is not ready
- select {
- case s.changes <- strings.Split(string(msg), "|"):
- default:
- }
-
+ s.changes <- msg
}
}
}
--
Gitblit v1.8.0