| | |
| | | return pub,nil |
| | | } |
| | | |
| | | const ( |
| | | msgTopicAll = "sub-msg-all-topic" |
| | | ) |
| | | |
| | | func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) { |
| | | client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId) |
| | | if err !=nil { |
| | |
| | | recvCh: make(chan Message,50), |
| | | surveyors: gopherdiscovery.NewStringSet(), |
| | | } |
| | | var receivedCache = make(map[string]string) |
| | | go func() { |
| | | peers, _ := client.Peers() |
| | | for msg := range peers { |
| | | //判断是否是想要的主题消息 |
| | | var recvMsg Message |
| | | if err := json.Unmarshal(msg, &recvMsg);err ==nil { |
| | | if matchTopic(recvMsg.Topic, topics) { |
| | | sub.recvCh <- recvMsg |
| | | if b,matchedTopic := matchTopic(&recvMsg, topics);b { |
| | | if lastMsgId,ok := receivedCache[matchedTopic];ok { |
| | | if lastMsgId != recvMsg.Id { |
| | | receivedCache[matchedTopic] = recvMsg.Id |
| | | sub.recvCh <- recvMsg |
| | | } |
| | | } else { |
| | | receivedCache[matchedTopic] = recvMsg.Id |
| | | sub.recvCh <- recvMsg |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | return sub,nil |
| | | } |
| | | |
| | | func matchTopic(topic string,subTopics []string) bool { |
| | | if subTopics ==nil && len(subTopics) ==0 { |
| | | return true |
| | | func matchTopic(msg *Message,subTopics []string) (bool,string) { |
| | | if subTopics ==nil || len(subTopics) ==0 { |
| | | return true,msgTopicAll |
| | | } |
| | | for _,t := range subTopics { |
| | | if topic == t { |
| | | return true |
| | | if msg.Topic == t { |
| | | return true,msg.Topic |
| | | } |
| | | } |
| | | return false |
| | | return false,"" |
| | | } |
| | | |
| | | func (ps *mangosPubSub) Surveyor() []string { |