From 5860c5fd3e6ae9f5412c12cbf16086e8585f3ef1 Mon Sep 17 00:00:00 2001 From: 龙赣华 <slongertian@gmail.com> Date: 星期一, 03 六月 2019 09:34:53 +0800 Subject: [PATCH] add heartbeat channels --- client.go | 69 ++++++++++++++++++---------------- 1 files changed, 37 insertions(+), 32 deletions(-) diff --git a/client.go b/client.go index b70d4e2..2733153 100644 --- a/client.go +++ b/client.go @@ -3,7 +3,6 @@ import ( "errors" "log" - "strings" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/respondent" @@ -27,6 +26,8 @@ // http://192.168.1.1:8080 service string + heartbeatmsg chan []byte + ctx context.Context cancel context.CancelFunc sock mangos.Socket @@ -41,7 +42,7 @@ ctx context.Context sock mangos.Socket - changes chan []string + changes chan []byte } func Client(urlServer string, service string) (*DiscoveryClient, error) { @@ -76,20 +77,25 @@ } client := &DiscoveryClient{ - urlServer: urlServer, - urlPubSub: urlPubSub, - service: service, - ctx: ctx, - cancel: cancel, - sock: sock, - subscriber: subscriber, + urlServer: urlServer, + urlPubSub: urlPubSub, + service: service, + ctx: ctx, + cancel: cancel, + sock: sock, + heartbeatmsg: make(chan []byte), + subscriber: subscriber, } go client.run() return client, nil } -func (d *DiscoveryClient) Peers() (chan []string, error) { +func (d *DiscoveryClient) HeartBeatMsg() chan []byte { + return d.heartbeatmsg +} + +func (d *DiscoveryClient) Peers() (chan []byte, error) { if d.subscriber == nil { return nil, errors.New("No subscribe url is provided to discover the Peers") } @@ -102,21 +108,27 @@ func (d *DiscoveryClient) run() { var err error + var msg []byte for { - _, err = d.sock.Recv() - - if err != nil { - log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) - } else { + select { + case <-d.ctx.Done(): + close(d.heartbeatmsg) + return + default: + msg, err = d.sock.Recv() + if err != nil { + log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) + continue + } + err = d.sock.Send([]byte(d.service)) + if err != nil { + log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) + continue + } select { - case <-d.ctx.Done(): - return - + case d.heartbeatmsg <- msg: + log.Println("recv heartbeat msg. ", msg) default: - err = d.sock.Send([]byte(d.service)) - if err != nil { - log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) - } } } } @@ -147,14 +159,14 @@ url: url, ctx: ctx, sock: sock, - changes: make(chan []string, 8), + changes: make(chan []byte, 8), } go subscriber.run() return subscriber, nil } -func (s *Subscriber) Changes() chan []string { +func (s *Subscriber) Changes() chan []byte { return s.changes } @@ -171,15 +183,8 @@ msg, err = s.sock.Recv() if err != nil { log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error()) - } - - // non-blocking send to the channel, discards changes if the channel is not ready - select { - case s.changes <- strings.Split(string(msg), "|"): - default: - } - + s.changes <- msg } } } -- Gitblit v1.8.0