fix
liuxiaolong
2020-01-14 1949d086a58f062dd249a792f8bf70d4921a4ae3
fix
1个文件已修改
28 ■■■■■ 已修改文件
mangos.go 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mangos.go
@@ -8,10 +8,10 @@
)
type mangosPubSub struct {
    url string
    url          string
    heartBeatUrl string
    pubCh chan Message  //publish msg chan
    aliveNodes gopherdiscovery.StringSet
    pubCh        chan Message  //publish msg chan
    surveyors    gopherdiscovery.StringSet
    recvCh chan Message  //recv msg chan
}
@@ -32,14 +32,13 @@
    fmt.Println("newPub err:",err)
    pub := &mangosPubSub{
        url: publishUrl,
        url:          publishUrl,
        heartBeatUrl: heartBeatUrl,
        aliveNodes: gopherdiscovery.NewStringSet(),
        pubCh: make(chan Message, 50),
        surveyors:    gopherdiscovery.NewStringSet(),
        pubCh:        make(chan Message, 50),
    }
    var msgCache = make(map[string]Message)
    //clientMsgCh := make(map[string]chan Message)
    cacheNodes := gopherdiscovery.NewStringSet()
    go func() {
        for {
            select {
@@ -50,7 +49,7 @@
                //        ch <- msg
                //    }
                //}
                if cacheNodes.Cardinality() >0 {
                if pub.surveyors.Cardinality() >0 {
                    sendB, _ := json.Marshal(msg)
                    discoveryServer.PublishMsg(string(sendB))
                }
@@ -62,8 +61,8 @@
                //        clientMsgCh[nodeId] = make(chan Message)
                //    }
                //}
                removedNodes := cacheNodes.Difference(nodeIds)
                addedNodes := nodeIds.Difference(cacheNodes)
                removedNodes := pub.surveyors.Difference(nodeIds)
                addedNodes := nodeIds.Difference(pub.surveyors)
                if len(nodeIds.ToSlice()) >0 {
                    if addedNodes.Cardinality() >0 { //有新节点上线的时候,需要发一次消息,节点离线的时候不用管
                        fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
@@ -74,9 +73,9 @@
                            }
                        }
                    }
                    cacheNodes = nodeIds
                    pub.surveyors = nodeIds
                } else {//订阅者全部阵亡
                    cacheNodes = nodeIds
                    pub.surveyors = nodeIds
                    time.Sleep(10 * time.Millisecond)
                }
            }
@@ -98,6 +97,7 @@
        url:subcribeUrl,
        heartBeatUrl: heartBeatUrl,
        recvCh: make(chan Message,50),
        surveyors: gopherdiscovery.NewStringSet(),
    }
    go func() {
        peers, _ := client.Peers()
@@ -127,6 +127,10 @@
    return false
}
func (ps *mangosPubSub) Surveyor() []string {
    return ps.surveyors.ToSlice()
}
func (ps *mangosPubSub) Publish(msg Message) {
    ps.pubCh <- msg
}