package gopherdiscovery import ( "encoding/json" "errors" "log" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/respondent" "nanomsg.org/go-mangos/protocol/sub" "golang.org/x/net/context" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) type DiscoveryClient struct { // url for the survey heartbeat // for example tcp://127.0.0.1:40007 urlServer string // url for the Pub/Sub // in this url you are going to get the changes on the set of nodes // for example tcp://127.0.0.1:50007 urlPubSub string // Service that needs to be discovered, for example for a web server could be // http://192.168.1.1:8080 service ServiceInfo heartbeatmsg chan []byte ctx context.Context cancel context.CancelFunc sock mangos.Socket subscriber *Subscriber } type Subscriber struct { // url for the Pub/Sub url string ctx context.Context sock mangos.Socket changes chan []byte } type ServiceInfo struct { ServiceId string `json:"serviceId"` Info []byte `json:"info"` } func Client(urlServer string, serviceId string) (*DiscoveryClient, error) { return ClientWithSub(urlServer, "", serviceId) } func ClientWithSub(urlServer string, urlPubSub string, serviceId string) (*DiscoveryClient, error) { var sock mangos.Socket var err error var subscriber *Subscriber ctx, cancel := context.WithCancel(context.Background()) if urlPubSub != "" { subCtx, _ := context.WithCancel(ctx) subscriber, err = NewSubscriber(subCtx, urlPubSub) if err != nil { return nil, err } } sock, err = respondent.NewSocket() if err != nil { return nil, err } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) err = sock.Dial(urlServer) if err != nil { return nil, err } svInfo := ServiceInfo{ ServiceId: serviceId, } client := &DiscoveryClient{ urlServer: urlServer, urlPubSub: urlPubSub, service: svInfo, ctx: ctx, cancel: cancel, sock: sock, heartbeatmsg: make(chan []byte), subscriber: subscriber, } go client.run() return client, nil } func (d *DiscoveryClient) HeartBeatMsg() chan []byte { return d.heartbeatmsg } func (d *DiscoveryClient) Peers() (chan []byte, error) { if d.subscriber == nil { return nil, errors.New("No subscribe url is provided to discover the Peers") } return d.subscriber.Changes(), nil } func (d *DiscoveryClient) Cancel() { d.cancel() } func (d *DiscoveryClient) SetResp(i []byte) { d.service.Info = i } func (d *DiscoveryClient) run() { var err error var msg []byte for { select { case <-d.ctx.Done(): close(d.heartbeatmsg) return default: msg, err = d.sock.Recv() if err != nil { log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) continue } sendB, err := json.Marshal(d.service) if err != nil { log.Println("DiscoveryClient: marshal d.serviceInfo err", err.Error()) continue } err = d.sock.Send(sendB) if err != nil { log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) continue } select { case d.heartbeatmsg <- msg: log.Println("recv heartbeat msg. ", msg) default: } } } } func NewSubscriber(ctx context.Context, url string) (*Subscriber, error) { var sock mangos.Socket var err error sock, err = sub.NewSocket() if err != nil { return nil, err } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) err = sock.Dial(url) if err != nil { return nil, err } // subscribes to everything err = sock.SetOption(mangos.OptionSubscribe, []byte("")) if err != nil { return nil, err } subscriber := &Subscriber{ url: url, ctx: ctx, sock: sock, changes: make(chan []byte, 8), } go subscriber.run() return subscriber, nil } func (s *Subscriber) Changes() chan []byte { return s.changes } func (s *Subscriber) run() { var msg []byte var err error for { select { case <-s.ctx.Done(): close(s.changes) return default: msg, err = s.sock.Recv() if err != nil { log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error()) } s.changes <- msg } } }