liuxiaolong
2020-03-11 1fc291e69cf5cabe8cb743d78f77fb34519495f5
client.go
@@ -1,9 +1,9 @@
package gopherdiscovery
import (
   "encoding/json"
   "errors"
   "log"
   "strings"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/respondent"
@@ -25,7 +25,9 @@
   // Service that needs to be discovered, for example for a web server could be
   // http://192.168.1.1:8080
   service string
   service ServiceInfo
   heartbeatmsg chan []byte
   ctx    context.Context
   cancel context.CancelFunc
@@ -41,14 +43,19 @@
   ctx  context.Context
   sock mangos.Socket
   changes chan []string
   changes chan []byte
}
func Client(urlServer string, service string) (*DiscoveryClient, error) {
   return ClientWithSub(urlServer, "", service)
type ServiceInfo struct {
   ServiceId string `json:"serviceId"`
   Info interface{} `json:"info"`
}
func ClientWithSub(urlServer string, urlPubSub string, service string) (*DiscoveryClient, error) {
func Client(urlServer string, serviceId string) (*DiscoveryClient, error) {
   return ClientWithSub(urlServer, "", serviceId)
}
func ClientWithSub(urlServer string, urlPubSub string, serviceId string) (*DiscoveryClient, error) {
   var sock mangos.Socket
   var err error
   var subscriber *Subscriber
@@ -75,21 +82,29 @@
      return nil, err
   }
   svInfo := ServiceInfo{
      ServiceId: serviceId,
   }
   client := &DiscoveryClient{
      urlServer:  urlServer,
      urlPubSub:  urlPubSub,
      service:    service,
      ctx:        ctx,
      cancel:     cancel,
      sock:       sock,
      subscriber: subscriber,
      urlServer:    urlServer,
      urlPubSub:    urlPubSub,
      service:      svInfo,
      ctx:          ctx,
      cancel:       cancel,
      sock:         sock,
      heartbeatmsg: make(chan []byte),
      subscriber:   subscriber,
   }
   go client.run()
   return client, nil
}
func (d *DiscoveryClient) Peers() (chan []string, error) {
func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
   return d.heartbeatmsg
}
func (d *DiscoveryClient) Peers() (chan []byte, error) {
   if d.subscriber == nil {
      return nil, errors.New("No subscribe url is provided to discover the Peers")
   }
@@ -100,27 +115,38 @@
   d.cancel()
}
func (d *DiscoveryClient) SetResp(i interface{}) {
   d.service.Info = i
}
func (d *DiscoveryClient) run() {
   var err error
   var surveyMsg []byte
   var msg []byte
   for {
      surveyMsg, err = d.sock.Recv()
      if err != nil {
         log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
      } else {
         log.Printf("client:%s received surveyor msg=%s ",d.service,string(surveyMsg))
      select {
      case <-d.ctx.Done():
         close(d.heartbeatmsg)
         return
      default:
         msg, err = d.sock.Recv()
         if err != nil {
            log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
            continue
         }
         sendB, err := json.Marshal(d.service)
         if err != nil {
            log.Println("DiscoveryClient: marshal d.serviceInfo err", err.Error())
            continue
         }
         err = d.sock.Send(sendB)
         if err != nil {
            log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
            continue
         }
         select {
         case <-d.ctx.Done():
            return
         case d.heartbeatmsg <- msg:
            log.Println("recv heartbeat msg. ", msg)
         default:
            err = d.sock.Send([]byte(d.service))
            if err != nil {
               log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
            }else {
               log.Printf("client:%s sent response i'm online",d.service)
            }
         }
      }
   }
@@ -151,14 +177,14 @@
      url:     url,
      ctx:     ctx,
      sock:    sock,
      changes: make(chan []string, 8),
      changes: make(chan []byte, 8),
   }
   go subscriber.run()
   return subscriber, nil
}
func (s *Subscriber) Changes() chan []string {
func (s *Subscriber) Changes() chan []byte {
   return s.changes
}
@@ -175,15 +201,8 @@
         msg, err = s.sock.Recv()
         if err != nil {
            log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
         }
         // non-blocking send to the channel, discards changes if the channel is not ready
         select {
         case s.changes <- strings.Split(string(msg), "|"):
         default:
         }
         s.changes <- msg
      }
   }
}