龙赣华
2019-06-03 5860c5fd3e6ae9f5412c12cbf16086e8585f3ef1
add heartbeat channels
1个文件已修改
69 ■■■■ 已修改文件
client.go 69 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client.go
@@ -3,7 +3,6 @@
import (
    "errors"
    "log"
    "strings"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/respondent"
@@ -27,6 +26,8 @@
    // http://192.168.1.1:8080
    service string
    heartbeatmsg chan []byte
    ctx    context.Context
    cancel context.CancelFunc
    sock   mangos.Socket
@@ -41,7 +42,7 @@
    ctx  context.Context
    sock mangos.Socket
    changes chan []string
    changes chan []byte
}
func Client(urlServer string, service string) (*DiscoveryClient, error) {
@@ -76,20 +77,25 @@
    }
    client := &DiscoveryClient{
        urlServer:  urlServer,
        urlPubSub:  urlPubSub,
        service:    service,
        ctx:        ctx,
        cancel:     cancel,
        sock:       sock,
        subscriber: subscriber,
        urlServer:    urlServer,
        urlPubSub:    urlPubSub,
        service:      service,
        ctx:          ctx,
        cancel:       cancel,
        sock:         sock,
        heartbeatmsg: make(chan []byte),
        subscriber:   subscriber,
    }
    go client.run()
    return client, nil
}
func (d *DiscoveryClient) Peers() (chan []string, error) {
func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
    return d.heartbeatmsg
}
func (d *DiscoveryClient) Peers() (chan []byte, error) {
    if d.subscriber == nil {
        return nil, errors.New("No subscribe url is provided to discover the Peers")
    }
@@ -102,21 +108,27 @@
func (d *DiscoveryClient) run() {
    var err error
    var msg []byte
    for {
        _, err = d.sock.Recv()
        if err != nil {
            log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
        } else {
        select {
        case <-d.ctx.Done():
            close(d.heartbeatmsg)
            return
        default:
            msg, err = d.sock.Recv()
            if err != nil {
                log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
                continue
            }
            err = d.sock.Send([]byte(d.service))
            if err != nil {
                log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
                continue
            }
            select {
            case <-d.ctx.Done():
                return
            case d.heartbeatmsg <- msg:
                log.Println("recv heartbeat msg. ", msg)
            default:
                err = d.sock.Send([]byte(d.service))
                if err != nil {
                    log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
                }
            }
        }
    }
@@ -147,14 +159,14 @@
        url:     url,
        ctx:     ctx,
        sock:    sock,
        changes: make(chan []string, 8),
        changes: make(chan []byte, 8),
    }
    go subscriber.run()
    return subscriber, nil
}
func (s *Subscriber) Changes() chan []string {
func (s *Subscriber) Changes() chan []byte {
    return s.changes
}
@@ -171,15 +183,8 @@
            msg, err = s.sock.Recv()
            if err != nil {
                log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
            }
            // non-blocking send to the channel, discards changes if the channel is not ready
            select {
            case s.changes <- strings.Split(string(msg), "|"):
            default:
            }
            s.changes <- msg
        }
    }
}