| | |
| | | package gopherdiscovery |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | "log" |
| | | "strings" |
| | | |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/respondent" |
| | |
| | | |
| | | // Service that needs to be discovered, for example for a web server could be |
| | | // http://192.168.1.1:8080 |
| | | service string |
| | | service ServiceInfo |
| | | |
| | | heartbeatmsg chan []byte |
| | | |
| | | ctx context.Context |
| | | cancel context.CancelFunc |
| | |
| | | ctx context.Context |
| | | sock mangos.Socket |
| | | |
| | | changes chan []string |
| | | changes chan []byte |
| | | } |
| | | |
| | | func Client(urlServer string, service string) (*DiscoveryClient, error) { |
| | | return ClientWithSub(urlServer, "", service) |
| | | type ServiceInfo struct { |
| | | ServiceId string `json:"serviceId"` |
| | | Info []byte `json:"info"` |
| | | } |
| | | |
| | | func ClientWithSub(urlServer string, urlPubSub string, service string) (*DiscoveryClient, error) { |
| | | func Client(urlServer string, serviceId string) (*DiscoveryClient, error) { |
| | | return ClientWithSub(urlServer, "", serviceId) |
| | | } |
| | | |
| | | func ClientWithSub(urlServer string, urlPubSub string, serviceId string) (*DiscoveryClient, error) { |
| | | var sock mangos.Socket |
| | | var err error |
| | | var subscriber *Subscriber |
| | |
| | | return nil, err |
| | | } |
| | | |
| | | svInfo := ServiceInfo{ |
| | | ServiceId: serviceId, |
| | | } |
| | | client := &DiscoveryClient{ |
| | | urlServer: urlServer, |
| | | urlPubSub: urlPubSub, |
| | | service: service, |
| | | ctx: ctx, |
| | | cancel: cancel, |
| | | sock: sock, |
| | | subscriber: subscriber, |
| | | urlServer: urlServer, |
| | | urlPubSub: urlPubSub, |
| | | service: svInfo, |
| | | ctx: ctx, |
| | | cancel: cancel, |
| | | sock: sock, |
| | | heartbeatmsg: make(chan []byte), |
| | | subscriber: subscriber, |
| | | } |
| | | |
| | | go client.run() |
| | | return client, nil |
| | | } |
| | | |
| | | func (d *DiscoveryClient) Peers() (chan []string, error) { |
| | | func (d *DiscoveryClient) HeartBeatMsg() chan []byte { |
| | | return d.heartbeatmsg |
| | | } |
| | | |
| | | func (d *DiscoveryClient) Peers() (chan []byte, error) { |
| | | if d.subscriber == nil { |
| | | return nil, errors.New("No subscribe url is provided to discover the Peers") |
| | | } |
| | |
| | | d.cancel() |
| | | } |
| | | |
| | | func (d *DiscoveryClient) SetResp(i []byte) { |
| | | d.service.Info = i |
| | | } |
| | | |
| | | func (d *DiscoveryClient) run() { |
| | | var err error |
| | | var msg []byte |
| | | for { |
| | | _, err = d.sock.Recv() |
| | | |
| | | if err != nil { |
| | | log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) |
| | | } else { |
| | | select { |
| | | case <-d.ctx.Done(): |
| | | close(d.heartbeatmsg) |
| | | return |
| | | default: |
| | | msg, err = d.sock.Recv() |
| | | if err != nil { |
| | | log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) |
| | | continue |
| | | } |
| | | sendB, err := json.Marshal(d.service) |
| | | if err != nil { |
| | | log.Println("DiscoveryClient: marshal d.serviceInfo err", err.Error()) |
| | | continue |
| | | } |
| | | err = d.sock.Send(sendB) |
| | | if err != nil { |
| | | log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) |
| | | continue |
| | | } |
| | | select { |
| | | case <-d.ctx.Done(): |
| | | return |
| | | |
| | | case d.heartbeatmsg <- msg: |
| | | log.Println("recv heartbeat msg. ", msg) |
| | | default: |
| | | err = d.sock.Send([]byte(d.service)) |
| | | if err != nil { |
| | | log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | url: url, |
| | | ctx: ctx, |
| | | sock: sock, |
| | | changes: make(chan []string, 8), |
| | | changes: make(chan []byte, 8), |
| | | } |
| | | |
| | | go subscriber.run() |
| | | return subscriber, nil |
| | | } |
| | | |
| | | func (s *Subscriber) Changes() chan []string { |
| | | func (s *Subscriber) Changes() chan []byte { |
| | | return s.changes |
| | | } |
| | | |
| | |
| | | msg, err = s.sock.Recv() |
| | | if err != nil { |
| | | log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error()) |
| | | |
| | | } |
| | | |
| | | // non-blocking send to the channel, discards changes if the channel is not ready |
| | | select { |
| | | case s.changes <- strings.Split(string(msg), "|"): |
| | | default: |
| | | } |
| | | |
| | | s.changes <- msg |
| | | } |
| | | } |
| | | } |