fix
liuxiaolong
2020-01-14 1949d086a58f062dd249a792f8bf70d4921a4ae3
mangos.go
@@ -8,10 +8,10 @@
)
type mangosPubSub struct {
   url string
   url          string
   heartBeatUrl string
   pubCh chan Message  //publish msg chan
   aliveNodes gopherdiscovery.StringSet
   pubCh        chan Message  //publish msg chan
   surveyors    gopherdiscovery.StringSet
   recvCh chan Message  //recv msg chan
}
@@ -32,14 +32,13 @@
   fmt.Println("newPub err:",err)
   pub := &mangosPubSub{
      url: publishUrl,
      url:          publishUrl,
      heartBeatUrl: heartBeatUrl,
      aliveNodes: gopherdiscovery.NewStringSet(),
      pubCh: make(chan Message, 50),
      surveyors:    gopherdiscovery.NewStringSet(),
      pubCh:        make(chan Message, 50),
   }
   var msgCache = make(map[string]Message)
   //clientMsgCh := make(map[string]chan Message)
   cacheNodes := gopherdiscovery.NewStringSet()
   go func() {
      for {
         select {
@@ -50,7 +49,7 @@
            //      ch <- msg
            //   }
            //}
            if cacheNodes.Cardinality() >0 {
            if pub.surveyors.Cardinality() >0 {
               sendB, _ := json.Marshal(msg)
               discoveryServer.PublishMsg(string(sendB))
            }
@@ -62,8 +61,8 @@
            //      clientMsgCh[nodeId] = make(chan Message)
            //   }
            //}
            removedNodes := cacheNodes.Difference(nodeIds)
            addedNodes := nodeIds.Difference(cacheNodes)
            removedNodes := pub.surveyors.Difference(nodeIds)
            addedNodes := nodeIds.Difference(pub.surveyors)
            if len(nodeIds.ToSlice()) >0 {
               if addedNodes.Cardinality() >0 { //有新节点上线的时候,需要发一次消息,节点离线的时候不用管
                  fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
@@ -74,9 +73,9 @@
                     }
                  }
               }
               cacheNodes = nodeIds
               pub.surveyors = nodeIds
            } else {//订阅者全部阵亡
               cacheNodes = nodeIds
               pub.surveyors = nodeIds
               time.Sleep(10 * time.Millisecond)
            }
         }
@@ -98,6 +97,7 @@
      url:subcribeUrl,
      heartBeatUrl: heartBeatUrl,
      recvCh: make(chan Message,50),
      surveyors: gopherdiscovery.NewStringSet(),
   }
   go func() {
      peers, _ := client.Peers()
@@ -127,6 +127,10 @@
   return false
}
func (ps *mangosPubSub) Surveyor() []string {
   return ps.surveyors.ToSlice()
}
func (ps *mangosPubSub) Publish(msg Message) {
   ps.pubCh <- msg
}