liuxiaolong
2020-01-15 445d8e4320e6785346e1f4ea6c789c084c8a5e90
mangos.go
@@ -85,6 +85,10 @@
   return pub,nil
}
const (
   msgTopicAll = "sub-msg-all-topic"
)
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
   client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
   if err !=nil {
@@ -99,14 +103,23 @@
      recvCh: make(chan Message,50),
      surveyors: gopherdiscovery.NewStringSet(),
   }
   var receivedCache = make(map[string]string)
   go func() {
      peers, _ := client.Peers()
      for msg := range peers {
         //判断是否是想要的主题消息
         var recvMsg Message
         if err := json.Unmarshal(msg, &recvMsg);err ==nil {
            if matchTopic(recvMsg.Topic, topics) {
               sub.recvCh <- recvMsg
            if b,matchedTopic := matchTopic(&recvMsg, topics);b {
               if lastMsgId,ok := receivedCache[matchedTopic];ok {
                  if lastMsgId != recvMsg.Id {
                     receivedCache[matchedTopic] = recvMsg.Id
                     sub.recvCh <- recvMsg
                  }
               } else {
                  receivedCache[matchedTopic] = recvMsg.Id
                  sub.recvCh <- recvMsg
               }
            }
         }
      }
@@ -115,16 +128,16 @@
   return sub,nil
}
func matchTopic(topic string,subTopics []string) bool {
   if subTopics ==nil && len(subTopics) ==0 {
      return true
func matchTopic(msg *Message,subTopics []string) (bool,string) {
   if subTopics ==nil || len(subTopics) ==0 {
      return true,msgTopicAll
   }
   for _,t := range subTopics {
      if topic == t {
         return true
      if msg.Topic == t {
         return true,msg.Topic
      }
   }
   return false
   return false,""
}
func (ps *mangosPubSub) Surveyor() []string {