package gopherdiscovery import ( "log" "time" "golang.org/x/net/context" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/pub" "nanomsg.org/go-mangos/protocol/surveyor" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) type Options struct { // SurveyTime is used to indicate the deadline for survey // responses SurveyTime time.Duration // RecvDeadline is the time until the next recived of the SURVEY times out. //RecvDeadline time.Duration // PollTime is minimal time between SURVEYS (The time between SURVEYS could be greater than this time // if the SURVEY process takes longer than that time) PollTime time.Duration } type DiscoveryServer struct { // url for the survey heartbeat // for example tcp://127.0.0.1:40007 urlServer string // url for the Pub/Sub // in this url you are going to get the changes on the set of nodes // for example tcp://127.0.0.1:50007 urlPubSub string // Time options opt Options // Set of the services that has been discovered services *Services ctx context.Context cancel context.CancelFunc sock mangos.Socket } type Services struct { // set of nodes discovered nodes StringSet // publisher, we are going to publish the changes of the set here publisher *Publisher } type Publisher struct { // url for pub/sub url string ctx context.Context sock mangos.Socket publishCh chan []byte } func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) { var sock mangos.Socket var err error var publisher *Publisher ctx, cancel := context.WithCancel(context.Background()) sock, err = surveyor.NewSocket() if err != nil { return nil, err } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) err = sock.Listen(urlServer) if err != nil { return nil, err } err = sock.SetOption(mangos.OptionSurveyTime, opt.SurveyTime) if err != nil { return nil, err } //err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline) //if err != nil { // return nil, err //} pubCtx, pubCancel := context.WithCancel(ctx) publisher, err = NewPublisher(pubCtx, urlPubSub) if err != nil { pubCancel() return nil, err } services := NewServices(publisher) server := &DiscoveryServer{ services: services, urlServer: urlServer, urlPubSub: urlPubSub, opt: opt, ctx: ctx, cancel: cancel, sock: sock, } go server.run() return server, nil } // Shutdown the server func (d *DiscoveryServer) Cancel() { d.cancel() } // Waits until the server finish running func (d *DiscoveryServer) Wait() { <-d.ctx.Done() } func (d *DiscoveryServer) run() { for { select { case <-time.After(d.opt.PollTime): d.poll() case <-d.ctx.Done(): return } } } func (d *DiscoveryServer) poll() { var err error var msg []byte var responses StringSet err = d.sock.Send([]byte("are you ok?")) if err != nil { log.Println("DiscoveryServer: Error sending the SURVEY", err.Error()) return } responses = NewStringSet() for { msg, err = d.sock.Recv() if err != nil { if err == mangos.ErrProtoState { // Timeout means I can add the current responses to the SET d.services.Add(responses) return } } else { responses.Add(string(msg)) } } } func NewPublisher(ctx context.Context, url string) (*Publisher, error) { var sock mangos.Socket var err error sock, err = pub.NewSocket() if err != nil { return nil, err } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) err = sock.Listen(url) if err != nil { return nil, err } publiser := &Publisher{ ctx: ctx, url: url, sock: sock, publishCh: make(chan []byte), } go publiser.run() return publiser, nil } func (p *Publisher) Publish(msg []byte) { p.publishCh <- msg } func (p *Publisher) run() { for { select { case <-p.ctx.Done(): close(p.publishCh) return case msg := <-p.publishCh: err := p.sock.Send(msg) if err != nil { log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error()) } } } } func NewServices(publisher *Publisher) *Services { s := &Services{ nodes: NewStringSet(), publisher: publisher, } return s } func (s *Services) Add(responses StringSet) { removed := s.nodes.Difference(responses) added := responses.Difference(s.nodes) // Do not publish anything if there is no changes if removed.Cardinality() == 0 && added.Cardinality() == 0 { return } s.nodes = responses // publish the changes //s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed } func (d *DiscoveryServer) AliveNodes() StringSet { return d.services.nodes } func (d *DiscoveryServer) PublishMsg(msg string) { d.services.publisher.Publish([]byte(msg)) }