龙赣华
2019-06-03 cded666730b2aa9618165a3b319fd19272045017
Merge branch 'master' of ssh://192.168.1.14:29418/valib/gopherdiscovery
1个文件已修改
31 ■■■■ 已修改文件
server.go 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.go
@@ -2,7 +2,6 @@
import (
    "log"
    "strings"
    "time"
    "golang.org/x/net/context"
@@ -19,7 +18,7 @@
    // responses
    SurveyTime time.Duration
    // RecvDeadline is the time until the next recived of the SURVEY times out.
    RecvDeadline time.Duration
    //RecvDeadline time.Duration
    // PollTime is minimal time between SURVEYS (The time between SURVEYS could be greater than this time
    // if the SURVEY process takes longer than that time)
    PollTime time.Duration
@@ -59,7 +58,7 @@
    ctx  context.Context
    sock mangos.Socket
    publishCh chan []string
    publishCh chan []byte
}
func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) {
@@ -85,10 +84,10 @@
    if err != nil {
        return nil, err
    }
    err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline)
    if err != nil {
        return nil, err
    }
    //err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline)
    //if err != nil {
    //    return nil, err
    //}
    pubCtx, pubCancel := context.WithCancel(ctx)
    publisher, err = NewPublisher(pubCtx, urlPubSub)
@@ -140,7 +139,7 @@
    var msg []byte
    var responses StringSet
    err = d.sock.Send([]byte(""))
    err = d.sock.Send([]byte("are you ok?"))
    if err != nil {
        log.Println("DiscoveryServer: Error sending the SURVEY", err.Error())
        return
@@ -150,13 +149,11 @@
    for {
        msg, err = d.sock.Recv()
        if err != nil {
            if err == mangos.ErrRecvTimeout {
            if err == mangos.ErrProtoState {
                // Timeout means I can add the current responses to the SET
                d.services.Add(responses)
                return
            }
            //fmt.Println(" err: ",err.Error())
            //log.Println("DiscoveryServer: Error reading SURVEY responses", err.Error())
        } else {
            responses.Add(string(msg))
        }
@@ -184,14 +181,14 @@
        url:  url,
        sock: sock,
        publishCh: make(chan []string),
        publishCh: make(chan []byte),
    }
    go publiser.run()
    return publiser, nil
}
func (p *Publisher) Publish(msg []string) {
func (p *Publisher) Publish(msg []byte) {
    p.publishCh <- msg
}
@@ -202,7 +199,7 @@
            close(p.publishCh)
            return
        case msg := <-p.publishCh:
            err := p.sock.Send([]byte(strings.Join(msg, "|")))
            err := p.sock.Send(msg)
            if err != nil {
                log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error())
            }
@@ -233,6 +230,6 @@
    //s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed
}
func (d *DiscoveryServer) PublishMsg(msg string){
    d.services.publisher.Publish([]string{msg})
}
func (d *DiscoveryServer) PublishMsg(msg string) {
    d.services.publisher.Publish([]byte(msg))
}