| | |
| | | package gopherdiscovery |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "log" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "golang.org/x/net/context" |
| | |
| | | nodes StringSet |
| | | // publisher, we are going to publish the changes of the set here |
| | | publisher *Publisher |
| | | |
| | | svInfo map[string][]byte |
| | | svLock sync.RWMutex |
| | | } |
| | | |
| | | type Publisher struct { |
| | |
| | | ctx context.Context |
| | | sock mangos.Socket |
| | | |
| | | publishCh chan []string |
| | | publishCh chan []byte |
| | | } |
| | | |
| | | func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) { |
| | |
| | | var err error |
| | | var msg []byte |
| | | var responses StringSet |
| | | var si ServiceInfo |
| | | |
| | | err = d.sock.Send([]byte("are you ok?")) |
| | | if err != nil { |
| | |
| | | } |
| | | |
| | | responses = NewStringSet() |
| | | d.services.svInfo = make(map[string][]byte, 0) |
| | | for { |
| | | msg, err = d.sock.Recv() |
| | | if err != nil { |
| | |
| | | d.services.Add(responses) |
| | | return |
| | | } |
| | | //fmt.Println(" err: ",err.Error()) |
| | | //log.Println("DiscoveryServer: Error reading SURVEY responses", err.Error()) |
| | | } else { |
| | | responses.Add(string(msg)) |
| | | if json.Unmarshal(msg, &si) == nil { |
| | | responses.Add(si.ServiceId) |
| | | d.services.svLock.Lock() |
| | | d.services.svInfo[si.ServiceId] = si.Info |
| | | d.services.svLock.Unlock() |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | url: url, |
| | | sock: sock, |
| | | |
| | | publishCh: make(chan []string), |
| | | publishCh: make(chan []byte), |
| | | } |
| | | |
| | | go publiser.run() |
| | | return publiser, nil |
| | | } |
| | | |
| | | func (p *Publisher) Publish(msg []string) { |
| | | func (p *Publisher) Publish(msg []byte) { |
| | | p.publishCh <- msg |
| | | } |
| | | |
| | |
| | | close(p.publishCh) |
| | | return |
| | | case msg := <-p.publishCh: |
| | | err := p.sock.Send([]byte(strings.Join(msg, "|"))) |
| | | err := p.sock.Send(msg) |
| | | if err != nil { |
| | | log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error()) |
| | | } |
| | |
| | | s := &Services{ |
| | | nodes: NewStringSet(), |
| | | publisher: publisher, |
| | | svInfo: make(map[string][]byte, 0), |
| | | } |
| | | |
| | | return s |
| | |
| | | //s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed |
| | | } |
| | | |
| | | func (d *DiscoveryServer) PublishMsg(msg string){ |
| | | d.services.publisher.Publish([]string{msg}) |
| | | } |
| | | func (d *DiscoveryServer) AliveNodes() StringSet { |
| | | return d.services.nodes |
| | | } |
| | | |
| | | func (d *DiscoveryServer) SvInfo() map[string][]byte { |
| | | d.services.svLock.Lock() |
| | | defer d.services.svLock.Unlock() |
| | | m := make(map[string][]byte) |
| | | for k,v := range d.services.svInfo { |
| | | m[k] = v |
| | | } |
| | | return m |
| | | } |
| | | |
| | | func (d *DiscoveryServer) PublishMsg(msg string) { |
| | | d.services.publisher.Publish([]byte(msg)) |
| | | } |