fix
liuxiaolong
2020-01-13 fb64156e926cfd98d0b4891543bdb47151272486
fix
2个文件已修改
163 ■■■■ 已修改文件
mangos.go 154 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pubsub.go 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mangos.go
@@ -1,116 +1,110 @@
package pubsub
import (
    "context"
    "basic.com/valib/gopherdiscovery.git"
    "encoding/json"
    "fmt"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pub"
    "nanomsg.org/go-mangos/protocol/sub"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "time"
)
type mangosPubSub struct {
    url string
    ctx  context.Context
    sock mangos.Socket
    pubCh chan []byte  //publish msg chan
    heartBeatUrl string
    pubCh chan Message  //publish msg chan
    aliveNodes gopherdiscovery.StringSet
    recvCh chan Message  //recv msg chan
}
func newPub(url string) (*mangosPubSub,error) {
    var sock mangos.Socket
func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) {
    var discoveryServer *gopherdiscovery.DiscoveryServer
    var err error
    var (
        defaultOpts = gopherdiscovery.Options{
            SurveyTime:   3 * time.Second,
            //RecvDeadline: 3 * time.Second,
            PollTime:     5 * time.Second,
        }
    )
    sock, err = pub.NewSocket()
    if err != nil {
        return nil, err
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
    err = sock.Listen(url)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    fmt.Println("err:",err)
    pub := &mangosPubSub{
        url: url,
        ctx: ctx,
        sock: sock,
        pubCh: make(chan []byte),
        url: publishUrl,
        heartBeatUrl: heartBeatUrl,
        aliveNodes: gopherdiscovery.NewStringSet(),
        pubCh: make(chan Message, 50),
    }
    var msgCache = make(map[string]Message)
    //clientMsgCh := make(map[string]chan Message)
    cacheNodes := gopherdiscovery.NewStringSet()
    go func() {
        for {
            select {
            case <-ctx.Done():
                close(pub.pubCh)
                cancel()
                return
            case msg := <-pub.pubCh:
                err := pub.sock.Send(msg)
                if err != nil {
                    fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
                msgCache[msg.Topic] = msg
                //if len(clientMsgCh) > 0 {
                //    for _, ch := range clientMsgCh {
                //        ch <- msg
                //    }
                //}
                if cacheNodes.Cardinality() >0 {
                    sendB, _ := json.Marshal(msg)
                    discoveryServer.PublishMsg(string(sendB))
                }
            default:
                nodeIds := discoveryServer.AliveNodes().ToSlice()
                if len(nodeIds) >0 {
                    //for _,nodeId := range nodeIds {
                    //    if _,ok := clientMsgCh[nodeId]; !ok {
                    //        clientMsgCh[nodeId] = make(chan Message)
                    //    }
                    //}
                    if cacheNodes.Cardinality() == 0 { //第一次有上线的节点
                        if len(msgCache) > 0 {
                            for _,cMsg := range msgCache {
                                sendB, _ := json.Marshal(cMsg)
                                discoveryServer.PublishMsg(string(sendB))
                            }
                        }
                    }
                    cacheNodes = discoveryServer.AliveNodes()
                } else {
                    time.Sleep(10 * time.Millisecond)
                }
            }
        }
    }()
    go func() {
    }()
    return pub,nil
}
func newSub(url string, topics []string) (*mangosPubSub,error) {
    var sock mangos.Socket
    var err error
    sock, err = sub.NewSocket()
    if err != nil {
        return nil, err
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
    client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
    if err !=nil {
        return nil,err
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    err = sock.Dial(url)
    if err != nil {
        return nil, err
    }
    // subscribes to everything
    err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    heartMsg := client.HeartBeatMsg()
    _= <-heartMsg
    fmt.Println("heat beat with server success")
    sub := &mangosPubSub{
        url:url,
        ctx: ctx,
        sock: sock,
        url:subcribeUrl,
        heartBeatUrl: heartBeatUrl,
        recvCh: make(chan Message,50),
    }
    var msg []byte
    go func() {
        for {
            select {
            case <-ctx.Done():
                close(sub.recvCh)
                cancel()
                return
            default:
                msg, err = sub.sock.Recv()
                if err != nil {
                    fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
                } else {
                    //判断是否是想要的主题消息
                    var recvMsg Message
                    if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
                        if matchTopic(recvMsg.Topic, topics) {
                            sub.recvCh <- recvMsg
                        }
                    }
        peers, _ := client.Peers()
        for msg := range peers {
            //判断是否是想要的主题消息
            var recvMsg Message
            if err := json.Unmarshal(msg, &recvMsg);err ==nil {
                if matchTopic(recvMsg.Topic, topics) {
                    sub.recvCh <- recvMsg
                }
            }
        }
@@ -131,7 +125,7 @@
    return false
}
func (ps *mangosPubSub) Publish(msg []byte) {
func (ps *mangosPubSub) Publish(msg Message) {
    ps.pubCh <- msg
}
pubsub.go
@@ -19,10 +19,11 @@
    Topic_Event = "event" //事件消息
)
func NewPublisher(url string,mode int) (PubSub,error) {
    return newPub(url)
func NewPublisher(url string,heartBeatUrl string,mode int) (PubSub,error) {
    return newPub(url,heartBeatUrl)
}
func NewSubscriber(url string,mode int,topics []string) (PubSub,error) {
    return newSub(url, topics)
//processId is process Identifier,unique
func NewSubscriber(url string,heartBeatUrl string,mode int,topics []string,processId string) (PubSub,error) {
    return newSub(url,heartBeatUrl, topics, processId)
}