From fb64156e926cfd98d0b4891543bdb47151272486 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期一, 13 一月 2020 16:35:24 +0800 Subject: [PATCH] fix --- mangos.go | 154 ++++++++++++++++++++++++-------------------------- 1 files changed, 74 insertions(+), 80 deletions(-) diff --git a/mangos.go b/mangos.go index 50b56f5..169c2c4 100644 --- a/mangos.go +++ b/mangos.go @@ -1,116 +1,110 @@ package pubsub import ( - "context" + "basic.com/valib/gopherdiscovery.git" "encoding/json" "fmt" - "nanomsg.org/go-mangos" - "nanomsg.org/go-mangos/protocol/pub" - "nanomsg.org/go-mangos/protocol/sub" - "nanomsg.org/go-mangos/transport/ipc" - "nanomsg.org/go-mangos/transport/tcp" + "time" ) type mangosPubSub struct { url string - - ctx context.Context - - sock mangos.Socket - - pubCh chan []byte //publish msg chan + heartBeatUrl string + pubCh chan Message //publish msg chan + aliveNodes gopherdiscovery.StringSet recvCh chan Message //recv msg chan } -func newPub(url string) (*mangosPubSub,error) { - var sock mangos.Socket + +func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) { + var discoveryServer *gopherdiscovery.DiscoveryServer var err error + var ( + defaultOpts = gopherdiscovery.Options{ + SurveyTime: 3 * time.Second, + //RecvDeadline: 3 * time.Second, + PollTime: 5 * time.Second, + } + ) - sock, err = pub.NewSocket() - if err != nil { - return nil, err - } - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) + discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts) - err = sock.Listen(url) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) + fmt.Println("err:",err) pub := &mangosPubSub{ - url: url, - ctx: ctx, - sock: sock, - pubCh: make(chan []byte), + url: publishUrl, + heartBeatUrl: heartBeatUrl, + aliveNodes: gopherdiscovery.NewStringSet(), + pubCh: make(chan Message, 50), } + var msgCache = make(map[string]Message) + //clientMsgCh := make(map[string]chan Message) + cacheNodes := gopherdiscovery.NewStringSet() go func() { for { select { - case <-ctx.Done(): - close(pub.pubCh) - cancel() - return case msg := <-pub.pubCh: - err := pub.sock.Send(msg) - if err != nil { - fmt.Println("Error PUBLISH MSG to the socket:", err.Error()) + msgCache[msg.Topic] = msg + //if len(clientMsgCh) > 0 { + // for _, ch := range clientMsgCh { + // ch <- msg + // } + //} + if cacheNodes.Cardinality() >0 { + sendB, _ := json.Marshal(msg) + discoveryServer.PublishMsg(string(sendB)) + } + default: + nodeIds := discoveryServer.AliveNodes().ToSlice() + if len(nodeIds) >0 { + //for _,nodeId := range nodeIds { + // if _,ok := clientMsgCh[nodeId]; !ok { + // clientMsgCh[nodeId] = make(chan Message) + // } + //} + + if cacheNodes.Cardinality() == 0 { //绗竴娆℃湁涓婄嚎鐨勮妭鐐� + if len(msgCache) > 0 { + for _,cMsg := range msgCache { + sendB, _ := json.Marshal(cMsg) + discoveryServer.PublishMsg(string(sendB)) + } + } + } + cacheNodes = discoveryServer.AliveNodes() + } else { + time.Sleep(10 * time.Millisecond) } } } }() + go func() { + + }() return pub,nil } -func newSub(url string, topics []string) (*mangosPubSub,error) { - var sock mangos.Socket - var err error - - sock, err = sub.NewSocket() - if err != nil { - return nil, err +func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) { + client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId) + if err !=nil { + return nil,err } - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) - - err = sock.Dial(url) - if err != nil { - return nil, err - } - // subscribes to everything - err = sock.SetOption(mangos.OptionSubscribe, []byte("")) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) + heartMsg := client.HeartBeatMsg() + _= <-heartMsg + fmt.Println("heat beat with server success") sub := &mangosPubSub{ - url:url, - ctx: ctx, - sock: sock, + url:subcribeUrl, + heartBeatUrl: heartBeatUrl, recvCh: make(chan Message,50), } - - var msg []byte go func() { - for { - select { - case <-ctx.Done(): - close(sub.recvCh) - cancel() - return - default: - msg, err = sub.sock.Recv() - if err != nil { - fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error()) - } else { - //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅 - var recvMsg Message - if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil { - if matchTopic(recvMsg.Topic, topics) { - sub.recvCh <- recvMsg - } - } + peers, _ := client.Peers() + for msg := range peers { + //鍒ゆ柇鏄惁鏄兂瑕佺殑涓婚娑堟伅 + var recvMsg Message + if err := json.Unmarshal(msg, &recvMsg);err ==nil { + if matchTopic(recvMsg.Topic, topics) { + sub.recvCh <- recvMsg } } } @@ -131,7 +125,7 @@ return false } -func (ps *mangosPubSub) Publish(msg []byte) { +func (ps *mangosPubSub) Publish(msg Message) { ps.pubCh <- msg } -- Gitblit v1.8.0