liuxiaolong
2020-01-15 445d8e4320e6785346e1f4ea6c789c084c8a5e90
msg consume once
2个文件已修改
30 ■■■■ 已修改文件
mangos.go 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pubsub.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mangos.go
@@ -85,6 +85,10 @@
    return pub,nil
}
const (
    msgTopicAll = "sub-msg-all-topic"
)
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
    client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
    if err !=nil {
@@ -99,14 +103,23 @@
        recvCh: make(chan Message,50),
        surveyors: gopherdiscovery.NewStringSet(),
    }
    var receivedCache = make(map[string]string)
    go func() {
        peers, _ := client.Peers()
        for msg := range peers {
            //判断是否是想要的主题消息
            var recvMsg Message
            if err := json.Unmarshal(msg, &recvMsg);err ==nil {
                if matchTopic(recvMsg.Topic, topics) {
                    sub.recvCh <- recvMsg
                if b,matchedTopic := matchTopic(&recvMsg, topics);b {
                    if lastMsgId,ok := receivedCache[matchedTopic];ok {
                        if lastMsgId != recvMsg.Id {
                            receivedCache[matchedTopic] = recvMsg.Id
                            sub.recvCh <- recvMsg
                        }
                    } else {
                        receivedCache[matchedTopic] = recvMsg.Id
                        sub.recvCh <- recvMsg
                    }
                }
            }
        }
@@ -115,16 +128,16 @@
    return sub,nil
}
func matchTopic(topic string,subTopics []string) bool {
    if subTopics ==nil && len(subTopics) ==0 {
        return true
func matchTopic(msg *Message,subTopics []string) (bool,string) {
    if subTopics ==nil || len(subTopics) ==0 {
        return true,msgTopicAll
    }
    for _,t := range subTopics {
        if topic == t {
            return true
        if msg.Topic == t {
            return true,msg.Topic
        }
    }
    return false
    return false,""
}
func (ps *mangosPubSub) Surveyor() []string {
pubsub.go
@@ -10,6 +10,7 @@
}
type Message struct {
    Id string
    Topic string
    Msg []byte
}