| | |
| | | package gopherdiscovery |
| | | |
| | | import ( |
| | | "fmt" |
| | | "log" |
| | | "strings" |
| | | "time" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | |
| | | "golang.org/x/net/context" |
| | | |
| | |
| | | // responses |
| | | SurveyTime time.Duration |
| | | // RecvDeadline is the time until the next recived of the SURVEY times out. |
| | | RecvDeadline time.Duration |
| | | //RecvDeadline time.Duration |
| | | // PollTime is minimal time between SURVEYS (The time between SURVEYS could be greater than this time |
| | | // if the SURVEY process takes longer than that time) |
| | | PollTime time.Duration |
| | |
| | | ctx context.Context |
| | | sock mangos.Socket |
| | | |
| | | publishCh chan []string |
| | | publishCh chan []byte |
| | | } |
| | | |
| | | func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) { |
| | |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | //err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline) |
| | | //if err != nil { |
| | | // return nil, err |
| | | //} |
| | | |
| | | pubCtx, pubCancel := context.WithCancel(ctx) |
| | | publisher, err = NewPublisher(pubCtx, urlPubSub) |
| | |
| | | for { |
| | | msg, err = d.sock.Recv() |
| | | if err != nil { |
| | | if err == mangos.ErrRecvTimeout {//在超时时间内收到的响应 |
| | | if err == mangos.ErrProtoState { |
| | | // Timeout means I can add the current responses to the SET |
| | | fmt.Println("mangos.ErrRecvTimeout") |
| | | d.services.Add(responses) |
| | | return |
| | | } |
| | | //fmt.Println(" err: ",err.Error()) |
| | | //log.Println("DiscoveryServer: Error reading SURVEY responses", err.Error()) |
| | | } else { |
| | | fmt.Printf("poll received msg:%s \n",string(msg)) |
| | | responses.Add(string(msg)) |
| | | //break |
| | | } |
| | | } |
| | | fmt.Println("for out") |
| | | } |
| | | |
| | | func NewPublisher(ctx context.Context, url string) (*Publisher, error) { |
| | |
| | | url: url, |
| | | sock: sock, |
| | | |
| | | publishCh: make(chan []string), |
| | | publishCh: make(chan []byte), |
| | | } |
| | | |
| | | go publiser.run() |
| | | return publiser, nil |
| | | } |
| | | |
| | | func (p *Publisher) Publish(msg []string) { |
| | | func (p *Publisher) Publish(msg []byte) { |
| | | p.publishCh <- msg |
| | | } |
| | | |
| | |
| | | close(p.publishCh) |
| | | return |
| | | case msg := <-p.publishCh: |
| | | err := p.sock.Send([]byte(strings.Join(msg, "|"))) |
| | | err := p.sock.Send(msg) |
| | | if err != nil { |
| | | log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error()) |
| | | } |
| | |
| | | |
| | | s.nodes = responses |
| | | // publish the changes |
| | | s.publisher.Publish(s.nodes.ToSlice()) |
| | | //s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed |
| | | } |
| | | |
| | | func (d *DiscoveryServer) PublishMsg(msg protomsg.PublishMessage){ |
| | | d.services.publisher.Publish([]string{string(msg)}) |
| | | } |
| | | func (d *DiscoveryServer) PublishMsg(msg string) { |
| | | d.services.publisher.Publish([]byte(msg)) |
| | | } |