package gopherdiscovery
|
|
import (
|
"encoding/json"
|
"errors"
|
"log"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/respondent"
|
"nanomsg.org/go-mangos/protocol/sub"
|
|
"golang.org/x/net/context"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
type DiscoveryClient struct {
|
// url for the survey heartbeat
|
// for example tcp://127.0.0.1:40007
|
urlServer string
|
// url for the Pub/Sub
|
// in this url you are going to get the changes on the set of nodes
|
// for example tcp://127.0.0.1:50007
|
urlPubSub string
|
|
// Service that needs to be discovered, for example for a web server could be
|
// http://192.168.1.1:8080
|
service ServiceInfo
|
|
heartbeatmsg chan []byte
|
|
ctx context.Context
|
cancel context.CancelFunc
|
sock mangos.Socket
|
|
subscriber *Subscriber
|
}
|
|
type Subscriber struct {
|
// url for the Pub/Sub
|
url string
|
|
ctx context.Context
|
sock mangos.Socket
|
|
changes chan []byte
|
}
|
|
type ServiceInfo struct {
|
ServiceId string `json:"serviceId"`
|
Info interface{} `json:"info"`
|
}
|
|
func Client(urlServer string, serviceId string) (*DiscoveryClient, error) {
|
return ClientWithSub(urlServer, "", serviceId)
|
}
|
|
func ClientWithSub(urlServer string, urlPubSub string, serviceId string) (*DiscoveryClient, error) {
|
var sock mangos.Socket
|
var err error
|
var subscriber *Subscriber
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if urlPubSub != "" {
|
subCtx, _ := context.WithCancel(ctx)
|
subscriber, err = NewSubscriber(subCtx, urlPubSub)
|
if err != nil {
|
return nil, err
|
}
|
}
|
|
sock, err = respondent.NewSocket()
|
if err != nil {
|
return nil, err
|
}
|
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
err = sock.Dial(urlServer)
|
if err != nil {
|
return nil, err
|
}
|
|
svInfo := ServiceInfo{
|
ServiceId: serviceId,
|
}
|
client := &DiscoveryClient{
|
urlServer: urlServer,
|
urlPubSub: urlPubSub,
|
service: svInfo,
|
ctx: ctx,
|
cancel: cancel,
|
sock: sock,
|
heartbeatmsg: make(chan []byte),
|
subscriber: subscriber,
|
}
|
|
go client.run()
|
return client, nil
|
}
|
|
func (d *DiscoveryClient) HeartBeatMsg() chan []byte {
|
return d.heartbeatmsg
|
}
|
|
func (d *DiscoveryClient) Peers() (chan []byte, error) {
|
if d.subscriber == nil {
|
return nil, errors.New("No subscribe url is provided to discover the Peers")
|
}
|
return d.subscriber.Changes(), nil
|
}
|
|
func (d *DiscoveryClient) Cancel() {
|
d.cancel()
|
}
|
|
func (d *DiscoveryClient) SetResp(i interface{}) {
|
d.service.Info = i
|
}
|
|
func (d *DiscoveryClient) run() {
|
var err error
|
var msg []byte
|
for {
|
select {
|
case <-d.ctx.Done():
|
close(d.heartbeatmsg)
|
return
|
default:
|
msg, err = d.sock.Recv()
|
if err != nil {
|
log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error())
|
continue
|
}
|
sendB, err := json.Marshal(d.service)
|
if err != nil {
|
log.Println("DiscoveryClient: marshal d.serviceInfo err", err.Error())
|
continue
|
}
|
err = d.sock.Send(sendB)
|
if err != nil {
|
log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error())
|
continue
|
}
|
select {
|
case d.heartbeatmsg <- msg:
|
log.Println("recv heartbeat msg. ", msg)
|
default:
|
}
|
}
|
}
|
}
|
|
func NewSubscriber(ctx context.Context, url string) (*Subscriber, error) {
|
var sock mangos.Socket
|
var err error
|
|
sock, err = sub.NewSocket()
|
if err != nil {
|
return nil, err
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
|
err = sock.Dial(url)
|
if err != nil {
|
return nil, err
|
}
|
// subscribes to everything
|
err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
|
if err != nil {
|
return nil, err
|
}
|
|
subscriber := &Subscriber{
|
url: url,
|
ctx: ctx,
|
sock: sock,
|
changes: make(chan []byte, 8),
|
}
|
|
go subscriber.run()
|
return subscriber, nil
|
}
|
|
func (s *Subscriber) Changes() chan []byte {
|
return s.changes
|
}
|
|
func (s *Subscriber) run() {
|
var msg []byte
|
var err error
|
|
for {
|
select {
|
case <-s.ctx.Done():
|
close(s.changes)
|
return
|
default:
|
msg, err = s.sock.Recv()
|
if err != nil {
|
log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error())
|
}
|
s.changes <- msg
|
}
|
}
|
}
|