From a4ea6380ed70468b1bbaca0328a65686960fcd52 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期一, 20 四月 2020 16:23:30 +0800 Subject: [PATCH] add svLock --- server.go | 67 +++++++++++++++++++++------------ 1 files changed, 42 insertions(+), 25 deletions(-) diff --git a/server.go b/server.go index 770732b..f23728c 100644 --- a/server.go +++ b/server.go @@ -1,12 +1,10 @@ package gopherdiscovery import ( - "fmt" + "encoding/json" "log" - "strings" + "sync" "time" - - "basic.com/pubsub/protomsg.git" "golang.org/x/net/context" @@ -22,7 +20,7 @@ // responses SurveyTime time.Duration // RecvDeadline is the time until the next recived of the SURVEY times out. - RecvDeadline time.Duration + //RecvDeadline time.Duration // PollTime is minimal time between SURVEYS (The time between SURVEYS could be greater than this time // if the SURVEY process takes longer than that time) PollTime time.Duration @@ -53,6 +51,9 @@ nodes StringSet // publisher, we are going to publish the changes of the set here publisher *Publisher + + svInfo map[string][]byte + svLock sync.RWMutex } type Publisher struct { @@ -62,7 +63,7 @@ ctx context.Context sock mangos.Socket - publishCh chan []string + publishCh chan []byte } func Server(urlServer string, urlPubSub string, opt Options) (*DiscoveryServer, error) { @@ -88,10 +89,10 @@ if err != nil { return nil, err } - err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline) - if err != nil { - return nil, err - } + //err = sock.SetOption(mangos.OptionRecvDeadline, opt.RecvDeadline) + //if err != nil { + // return nil, err + //} pubCtx, pubCancel := context.WithCancel(ctx) publisher, err = NewPublisher(pubCtx, urlPubSub) @@ -142,6 +143,7 @@ var err error var msg []byte var responses StringSet + var si ServiceInfo err = d.sock.Send([]byte("are you ok?")) if err != nil { @@ -150,24 +152,24 @@ } responses = NewStringSet() + d.services.svInfo = make(map[string][]byte, 0) for { msg, err = d.sock.Recv() if err != nil { - if err == mangos.ErrRecvTimeout {//鍦ㄨ秴鏃舵椂闂村唴鏀跺埌鐨勫搷搴� + if err == mangos.ErrProtoState { // Timeout means I can add the current responses to the SET - fmt.Println("mangos.ErrRecvTimeout") d.services.Add(responses) return } - //fmt.Println(" err: ",err.Error()) - //log.Println("DiscoveryServer: Error reading SURVEY responses", err.Error()) } else { - fmt.Printf("poll received msg:%s \n",string(msg)) - responses.Add(string(msg)) - //break + if json.Unmarshal(msg, &si) == nil { + responses.Add(si.ServiceId) + d.services.svLock.Lock() + d.services.svInfo[si.ServiceId] = si.Info + d.services.svLock.Unlock() + } } } - fmt.Println("for out") } func NewPublisher(ctx context.Context, url string) (*Publisher, error) { @@ -191,14 +193,14 @@ url: url, sock: sock, - publishCh: make(chan []string), + publishCh: make(chan []byte), } go publiser.run() return publiser, nil } -func (p *Publisher) Publish(msg []string) { +func (p *Publisher) Publish(msg []byte) { p.publishCh <- msg } @@ -209,7 +211,7 @@ close(p.publishCh) return case msg := <-p.publishCh: - err := p.sock.Send([]byte(strings.Join(msg, "|"))) + err := p.sock.Send(msg) if err != nil { log.Println("DiscoveryServer: Error PUBLISHING changes to the socket", err.Error()) } @@ -221,6 +223,7 @@ s := &Services{ nodes: NewStringSet(), publisher: publisher, + svInfo: make(map[string][]byte, 0), } return s @@ -237,9 +240,23 @@ s.nodes = responses // publish the changes - s.publisher.Publish(s.nodes.ToSlice()) + //s.publisher.Publish(s.nodes.ToSlice())//publish nodes changed } -func (d *DiscoveryServer) PublishMsg(msg protomsg.PublishMessage){ - d.services.publisher.Publish([]string{string(msg)}) -} \ No newline at end of file +func (d *DiscoveryServer) AliveNodes() StringSet { + return d.services.nodes +} + +func (d *DiscoveryServer) SvInfo() map[string][]byte { + d.services.svLock.Lock() + defer d.services.svLock.Unlock() + m := make(map[string][]byte) + for k,v := range d.services.svInfo { + m[k] = v + } + return m +} + +func (d *DiscoveryServer) PublishMsg(msg string) { + d.services.publisher.Publish([]byte(msg)) +} -- Gitblit v1.8.0