龙赣华
2019-06-03 5860c5fd3e6ae9f5412c12cbf16086e8585f3ef1
client.go
@@ -3,7 +3,6 @@
import (
   "errors"
   "log"
   "strings"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/respondent"
@@ -27,6 +26,8 @@
   // http://192.168.1.1:8080
   service string
   heartbeatmsg chan []byte
   ctx    context.Context
   cancel context.CancelFunc
   sock   mangos.Socket
@@ -41,7 +42,7 @@
   ctx  context.Context
   sock mangos.Socket
   changes chan []string
   changes chan []byte
}
func Client(urlServer string, service string) (*DiscoveryClient, error) {
@@ -76,20 +77,25 @@
   }
   client := &DiscoveryClient{
      urlServer:  urlServer,
      urlPubSub:  urlPubSub,
      service:    service,
      ctx:        ctx,
      cancel:     cancel,
      sock:       sock,
      subscriber: subscriber,
      urlServer:    urlServer,
      urlPubSub:    urlPubSub,
      service:      service,
      ctx:          ctx,
      cancel:       cancel,
      sock:         sock,
      heartbeatmsg: make(chan []byte),
      subscriber:   subscriber,
   }
   go client.run()
   return client, nil
}
func (d *DiscoveryClient) Peers() (chan []string, error) {
func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
   return d.heartbeatmsg
}
func (d *DiscoveryClient) Peers() (chan []byte, error) {
   if d.subscriber == nil {
      return nil, errors.New("No subscribe url is provided to discover the Peers")
   }
@@ -102,21 +108,27 @@
func (d *DiscoveryClient) run() {
   var err error
   var msg []byte
   for {
      _, err = d.sock.Recv()
      if err != nil {
         log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
      } else {
      select {
      case <-d.ctx.Done():
         close(d.heartbeatmsg)
         return
      default:
         msg, err = d.sock.Recv()
         if err != nil {
            log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
            continue
         }
         err = d.sock.Send([]byte(d.service))
         if err != nil {
            log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
            continue
         }
         select {
         case <-d.ctx.Done():
            return
         case d.heartbeatmsg <- msg:
            log.Println("recv heartbeat msg. ", msg)
         default:
            err = d.sock.Send([]byte(d.service))
            if err != nil {
               log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
            }
         }
      }
   }
@@ -147,14 +159,14 @@
      url:     url,
      ctx:     ctx,
      sock:    sock,
      changes: make(chan []string, 8),
      changes: make(chan []byte, 8),
   }
   go subscriber.run()
   return subscriber, nil
}
func (s *Subscriber) Changes() chan []string {
func (s *Subscriber) Changes() chan []byte {
   return s.changes
}
@@ -171,15 +183,8 @@
         msg, err = s.sock.Recv()
         if err != nil {
            log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
         }
         // non-blocking send to the channel, discards changes if the channel is not ready
         select {
         case s.changes <- strings.Split(string(msg), "|"):
         default:
         }
         s.changes <- msg
      }
   }
}