liuxiaolong
2020-03-11 1fc291e69cf5cabe8cb743d78f77fb34519495f5
server.go
@@ -1,9 +1,8 @@
package gopherdiscovery
import (
   "fmt"
   "encoding/json"
   "log"
   "strings"
   "time"
   "golang.org/x/net/context"
@@ -20,7 +19,7 @@
   // responses
   SurveyTime time.Duration
   // RecvDeadline is the time until the next recived of the SURVEY times out.
   RecvDeadline time.Duration
   //RecvDeadline time.Duration
   // PollTime is minimal time between SURVEYS (The time between SURVEYS could be greater than this time
   // if the SURVEY process takes longer than that time)
   PollTime time.Duration
@@ -51,6 +50,8 @@
   nodes StringSet
   // publisher, we are going to publish the changes of the set here
   publisher *Publisher
   svInfo map[string]interface{}
}
type Publisher struct {
@@ -60,7 +61,7 @@
   ctx  context.Context
   sock mangos.Socket
   publishCh chan []string
   publishCh chan []byte
}
func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) {
@@ -86,10 +87,10 @@
   if err != nil {
      return nil, err
   }
   err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline)
   if err != nil {
      return nil, err
   }
   //err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline)
   //if err != nil {
   //   return nil, err
   //}
   pubCtx, pubCancel := context.WithCancel(ctx)
   publisher, err = NewPublisher(pubCtx, urlPubSub)
@@ -140,6 +141,7 @@
   var err error
   var msg []byte
   var responses StringSet
   var si ServiceInfo
   err = d.sock.Send([]byte("are you ok?"))
   if err != nil {
@@ -151,21 +153,18 @@
   for {
      msg, err = d.sock.Recv()
      if err != nil {
         if err == mangos.ErrRecvTimeout {//在超时时间内收到的响应
         if err == mangos.ErrProtoState {
            // Timeout means I can add the current responses to the SET
            fmt.Println("mangos.ErrRecvTimeout")
            d.services.Add(responses)
            return
         }
         //fmt.Println(" err: ",err.Error())
         //log.Println("DiscoveryServer: Error reading SURVEY responses", err.Error())
      } else {
         fmt.Printf("poll received msg:%s \n",string(msg))
         responses.Add(string(msg))
         //break
         if  json.Unmarshal(msg, &si) == nil {
            responses.Add(si.ServiceId)
            d.services.svInfo[si.ServiceId] = si.Info
         }
      }
   }
   fmt.Println("for out")
}
func NewPublisher(ctx context.Context, url string) (*Publisher, error) {
@@ -189,14 +188,14 @@
      url:  url,
      sock: sock,
      publishCh: make(chan []string),
      publishCh: make(chan []byte),
   }
   go publiser.run()
   return publiser, nil
}
func (p *Publisher) Publish(msg []string) {
func (p *Publisher) Publish(msg []byte) {
   p.publishCh <- msg
}
@@ -207,7 +206,7 @@
         close(p.publishCh)
         return
      case msg := <-p.publishCh:
         err := p.sock.Send([]byte(strings.Join(msg, "|")))
         err := p.sock.Send(msg)
         if err != nil {
            log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error())
         }
@@ -219,6 +218,7 @@
   s := &Services{
      nodes:     NewStringSet(),
      publisher: publisher,
      svInfo: make(map[string]interface{}, 0),
   }
   return s
@@ -235,9 +235,21 @@
   s.nodes = responses
   // publish the changes
   s.publisher.Publish(s.nodes.ToSlice())
   //s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed
}
func (d *DiscoveryServer) PublishMsg(msg string){
   d.services.publisher.Publish([]string{msg})
}
func (d *DiscoveryServer) AliveNodes() StringSet {
   return d.services.nodes
}
func (d *DiscoveryServer) SvInfo() map[string]interface{} {
   m := make(map[string]interface{})
   for k,v := range d.services.svInfo {
      m[k] = v
   }
   return m
}
func (d *DiscoveryServer) PublishMsg(msg string) {
   d.services.publisher.Publish([]byte(msg))
}