liuxiaolong
2020-01-14 38eccdf006c8374e89b7b7bb816b2d4ce4b2a220
mangos.go
@@ -12,6 +12,7 @@
   heartBeatUrl string
   pubCh chan Message  //publish msg chan
   aliveNodes gopherdiscovery.StringSet
   clients map[string][]string
   recvCh chan Message  //recv msg chan
}
@@ -36,6 +37,7 @@
      heartBeatUrl: heartBeatUrl,
      aliveNodes: gopherdiscovery.NewStringSet(),
      pubCh: make(chan Message, 50),
      clients: make(map[string][]string),
   }
   var msgCache = make(map[string]Message)
   //clientMsgCh := make(map[string]chan Message)
@@ -55,15 +57,18 @@
               discoveryServer.PublishMsg(string(sendB))
            }
         default:
            nodeIds := discoveryServer.AliveNodes().ToSlice()
            if len(nodeIds) >0 {
               //for _,nodeId := range nodeIds {
               //   if _,ok := clientMsgCh[nodeId]; !ok {
               //      clientMsgCh[nodeId] = make(chan Message)
               //   }
               //}
            nodeIds := discoveryServer.AliveNodes()
               if cacheNodes.Cardinality() == 0 { //第一次有上线的节点
            //for _,nodeId := range nodeIds {
            //   if _,ok := clientMsgCh[nodeId]; !ok {
            //      clientMsgCh[nodeId] = make(chan Message)
            //   }
            //}
            removedNodes := cacheNodes.Difference(nodeIds)
            addedNodes := nodeIds.Difference(cacheNodes)
            if len(nodeIds.ToSlice()) >0 {
               if removedNodes.Cardinality() >0 || addedNodes.Cardinality() >0 { //节点有变化的时候,也需要发消息
                  fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
                  if len(msgCache) > 0 {
                     for _,cMsg := range msgCache {
                        sendB, _ := json.Marshal(cMsg)
@@ -71,17 +76,15 @@
                     }
                  }
               }
               cacheNodes = discoveryServer.AliveNodes()
            } else {
               cacheNodes = discoveryServer.AliveNodes()
               cacheNodes = nodeIds
            } else {//订阅者全部阵亡
               cacheNodes = nodeIds
               time.Sleep(10 * time.Millisecond)
            }
         }
      }
   }()
   go func() {
   }()
   return pub,nil
}