liuxiaolong
2020-01-15 445d8e4320e6785346e1f4ea6c789c084c8a5e90
mangos.go
@@ -8,10 +8,10 @@
)
type mangosPubSub struct {
   url string
   url          string
   heartBeatUrl string
   pubCh chan Message  //publish msg chan
   aliveNodes gopherdiscovery.StringSet
   pubCh        chan Message  //publish msg chan
   surveyors    gopherdiscovery.StringSet
   recvCh chan Message  //recv msg chan
}
@@ -30,16 +30,15 @@
   discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
   fmt.Println("err:",err)
   fmt.Println("newPub err:",err)
   pub := &mangosPubSub{
      url: publishUrl,
      url:          publishUrl,
      heartBeatUrl: heartBeatUrl,
      aliveNodes: gopherdiscovery.NewStringSet(),
      pubCh: make(chan Message, 50),
      surveyors:    gopherdiscovery.NewStringSet(),
      pubCh:        make(chan Message, 50),
   }
   var msgCache = make(map[string]Message)
   //clientMsgCh := make(map[string]chan Message)
   cacheNodes := gopherdiscovery.NewStringSet()
   go func() {
      for {
         select {
@@ -50,20 +49,23 @@
            //      ch <- msg
            //   }
            //}
            if cacheNodes.Cardinality() >0 {
            if pub.surveyors.Cardinality() >0 {
               sendB, _ := json.Marshal(msg)
               discoveryServer.PublishMsg(string(sendB))
            }
         default:
            nodeIds := discoveryServer.AliveNodes().ToSlice()
            if len(nodeIds) >0 {
               //for _,nodeId := range nodeIds {
               //   if _,ok := clientMsgCh[nodeId]; !ok {
               //      clientMsgCh[nodeId] = make(chan Message)
               //   }
               //}
            nodeIds := discoveryServer.AliveNodes()
               if cacheNodes.Cardinality() == 0 { //第一次有上线的节点
            //for _,nodeId := range nodeIds {
            //   if _,ok := clientMsgCh[nodeId]; !ok {
            //      clientMsgCh[nodeId] = make(chan Message)
            //   }
            //}
            removedNodes := pub.surveyors.Difference(nodeIds)
            addedNodes := nodeIds.Difference(pub.surveyors)
            if len(nodeIds.ToSlice()) >0 {
               if addedNodes.Cardinality() >0 { //有新节点上线的时候,需要发一次消息,节点离线的时候不用管
                  fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
                  if len(msgCache) > 0 {
                     for _,cMsg := range msgCache {
                        sendB, _ := json.Marshal(cMsg)
@@ -71,19 +73,21 @@
                     }
                  }
               }
               cacheNodes = discoveryServer.AliveNodes()
            } else {
               cacheNodes = discoveryServer.AliveNodes()
               pub.surveyors = nodeIds
            } else {//订阅者全部阵亡
               pub.surveyors = nodeIds
               time.Sleep(10 * time.Millisecond)
            }
         }
      }
   }()
   go func() {
   }()
   return pub,nil
}
const (
   msgTopicAll = "sub-msg-all-topic"
)
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
   client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
@@ -97,15 +101,25 @@
      url:subcribeUrl,
      heartBeatUrl: heartBeatUrl,
      recvCh: make(chan Message,50),
      surveyors: gopherdiscovery.NewStringSet(),
   }
   var receivedCache = make(map[string]string)
   go func() {
      peers, _ := client.Peers()
      for msg := range peers {
         //判断是否是想要的主题消息
         var recvMsg Message
         if err := json.Unmarshal(msg, &recvMsg);err ==nil {
            if matchTopic(recvMsg.Topic, topics) {
               sub.recvCh <- recvMsg
            if b,matchedTopic := matchTopic(&recvMsg, topics);b {
               if lastMsgId,ok := receivedCache[matchedTopic];ok {
                  if lastMsgId != recvMsg.Id {
                     receivedCache[matchedTopic] = recvMsg.Id
                     sub.recvCh <- recvMsg
                  }
               } else {
                  receivedCache[matchedTopic] = recvMsg.Id
                  sub.recvCh <- recvMsg
               }
            }
         }
      }
@@ -114,16 +128,20 @@
   return sub,nil
}
func matchTopic(topic string,subTopics []string) bool {
   if subTopics ==nil && len(subTopics) ==0 {
      return true
func matchTopic(msg *Message,subTopics []string) (bool,string) {
   if subTopics ==nil || len(subTopics) ==0 {
      return true,msgTopicAll
   }
   for _,t := range subTopics {
      if topic == t {
         return true
      if msg.Topic == t {
         return true,msg.Topic
      }
   }
   return false
   return false,""
}
func (ps *mangosPubSub) Surveyor() []string {
   return ps.surveyors.ToSlice()
}
func (ps *mangosPubSub) Publish(msg Message) {