From a4ea6380ed70468b1bbaca0328a65686960fcd52 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期一, 20 四月 2020 16:23:30 +0800 Subject: [PATCH] add svLock --- client.go | 95 +++++++++++++++++++++++++++++------------------ 1 files changed, 59 insertions(+), 36 deletions(-) diff --git a/client.go b/client.go index b70d4e2..d9a211b 100644 --- a/client.go +++ b/client.go @@ -1,9 +1,9 @@ package gopherdiscovery import ( + "encoding/json" "errors" "log" - "strings" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/respondent" @@ -25,7 +25,9 @@ // Service that needs to be discovered, for example for a web server could be // http://192.168.1.1:8080 - service string + service ServiceInfo + + heartbeatmsg chan []byte ctx context.Context cancel context.CancelFunc @@ -41,14 +43,19 @@ ctx context.Context sock mangos.Socket - changes chan []string + changes chan []byte } -func Client(urlServer string, service string) (*DiscoveryClient, error) { - return ClientWithSub(urlServer, "", service) +type ServiceInfo struct { + ServiceId string `json:"serviceId"` + Info []byte `json:"info"` } -func ClientWithSub(urlServer string, urlPubSub string, service string) (*DiscoveryClient, error) { +func Client(urlServer string, serviceId string) (*DiscoveryClient, error) { + return ClientWithSub(urlServer, "", serviceId) +} + +func ClientWithSub(urlServer string, urlPubSub string, serviceId string) (*DiscoveryClient, error) { var sock mangos.Socket var err error var subscriber *Subscriber @@ -75,21 +82,29 @@ return nil, err } + svInfo := ServiceInfo{ + ServiceId: serviceId, + } client := &DiscoveryClient{ - urlServer: urlServer, - urlPubSub: urlPubSub, - service: service, - ctx: ctx, - cancel: cancel, - sock: sock, - subscriber: subscriber, + urlServer: urlServer, + urlPubSub: urlPubSub, + service: svInfo, + ctx: ctx, + cancel: cancel, + sock: sock, + heartbeatmsg: make(chan []byte), + subscriber: subscriber, } go client.run() return client, nil } -func (d *DiscoveryClient) Peers() (chan []string, error) { +func (d *DiscoveryClient) HeartBeatMsg() chan []byte { + return d.heartbeatmsg +} + +func (d *DiscoveryClient) Peers() (chan []byte, error) { if d.subscriber == nil { return nil, errors.New("No subscribe url is provided to discover the Peers") } @@ -100,23 +115,38 @@ d.cancel() } +func (d *DiscoveryClient) SetResp(i []byte) { + d.service.Info = i +} + func (d *DiscoveryClient) run() { var err error + var msg []byte for { - _, err = d.sock.Recv() - - if err != nil { - log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) - } else { + select { + case <-d.ctx.Done(): + close(d.heartbeatmsg) + return + default: + msg, err = d.sock.Recv() + if err != nil { + log.Println("DiscoveryClient: Cannot receive the SURVEY", err.Error()) + continue + } + sendB, err := json.Marshal(d.service) + if err != nil { + log.Println("DiscoveryClient: marshal d.serviceInfo err", err.Error()) + continue + } + err = d.sock.Send(sendB) + if err != nil { + log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) + continue + } select { - case <-d.ctx.Done(): - return - + case d.heartbeatmsg <- msg: + log.Println("recv heartbeat msg. ", msg) default: - err = d.sock.Send([]byte(d.service)) - if err != nil { - log.Println("DiscoveryClient: Cannot send the SURVEY response", err.Error()) - } } } } @@ -147,14 +177,14 @@ url: url, ctx: ctx, sock: sock, - changes: make(chan []string, 8), + changes: make(chan []byte, 8), } go subscriber.run() return subscriber, nil } -func (s *Subscriber) Changes() chan []string { +func (s *Subscriber) Changes() chan []byte { return s.changes } @@ -171,15 +201,8 @@ msg, err = s.sock.Recv() if err != nil { log.Println("DiscoveryClient: Cannot SUBSCRIBE to the changes", err.Error()) - } - - // non-blocking send to the channel, discards changes if the channel is not ready - select { - case s.changes <- strings.Split(string(msg), "|"): - default: - } - + s.changes <- msg } } } -- Gitblit v1.8.0