package pubsub
|
|
import (
|
"basic.com/valib/gopherdiscovery.git"
|
"encoding/json"
|
"fmt"
|
"time"
|
)
|
|
type mangosPubSub struct {
|
url string
|
heartBeatUrl string
|
pubCh chan Message //publish msg chan
|
surveyors gopherdiscovery.StringSet
|
|
svr *gopherdiscovery.DiscoveryServer
|
cli *gopherdiscovery.DiscoveryClient
|
|
cliInfo map[string][]byte
|
|
recvCh chan Message //recv msg chan
|
}
|
|
|
func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) {
|
var discoveryServer *gopherdiscovery.DiscoveryServer
|
var err error
|
var (
|
defaultOpts = gopherdiscovery.Options{
|
SurveyTime: 1500 * time.Millisecond,
|
PollTime: 2 * time.Second,
|
}
|
)
|
|
discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
|
|
fmt.Println("newPub err:",err)
|
pub := &mangosPubSub{
|
url: publishUrl,
|
heartBeatUrl: heartBeatUrl,
|
surveyors: gopherdiscovery.NewStringSet(),
|
pubCh: make(chan Message, 50),
|
svr: discoveryServer,
|
}
|
var msgCache = make(map[string]Message)
|
//clientMsgCh := make(map[string]chan Message)
|
go func() {
|
for {
|
select {
|
case msg := <-pub.pubCh:
|
msgCache[msg.Topic] = msg
|
//if len(clientMsgCh) > 0 {
|
// for _, ch := range clientMsgCh {
|
// ch <- msg
|
// }
|
//}
|
fmt.Println("<-pub.pubCh,pub.surveyors.Len:",pub.surveyors.Cardinality())
|
if pub.surveyors.Cardinality() >0 {
|
sendB, _ := json.Marshal(msg)
|
discoveryServer.PublishMsg(string(sendB))
|
}
|
default:
|
nodeIds := discoveryServer.AliveNodes()
|
|
//for _,nodeId := range nodeIds {
|
// if _,ok := clientMsgCh[nodeId]; !ok {
|
// clientMsgCh[nodeId] = make(chan Message)
|
// }
|
//}
|
removedNodes := pub.surveyors.Difference(nodeIds)
|
addedNodes := nodeIds.Difference(pub.surveyors)
|
|
if addedNodes.Cardinality() >0 { //有新节点上线的时候,需要发一次消息,节点离线的时候不用管
|
fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
|
if len(msgCache) > 0 {
|
for _,cMsg := range msgCache {
|
sendB, _ := json.Marshal(cMsg)
|
discoveryServer.PublishMsg(string(sendB))
|
}
|
}
|
}
|
|
pub.surveyors = nodeIds
|
pub.cliInfo = discoveryServer.SvInfo()
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}()
|
|
return pub,nil
|
}
|
|
const (
|
msgTopicAll = "sub-msg-all-topic"
|
)
|
|
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
|
client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
|
if err !=nil {
|
return nil,err
|
}
|
heartMsg := client.HeartBeatMsg()
|
_= <-heartMsg
|
fmt.Println("heat beat with server success")
|
sub := &mangosPubSub{
|
url:subcribeUrl,
|
heartBeatUrl: heartBeatUrl,
|
recvCh: make(chan Message,50),
|
surveyors: gopherdiscovery.NewStringSet(),
|
cli: client,
|
}
|
var receivedCache = make(map[string]string)
|
go func() {
|
peers, _ := client.Peers()
|
for msg := range peers {
|
//判断是否是想要的主题消息
|
var recvMsg Message
|
if err := json.Unmarshal(msg, &recvMsg);err ==nil {
|
if b,matchedTopic := matchTopic(&recvMsg, topics);b {
|
if lastMsgId,ok := receivedCache[matchedTopic];ok {
|
if lastMsgId != recvMsg.Id {
|
receivedCache[matchedTopic] = recvMsg.Id
|
sub.recvCh <- recvMsg
|
}
|
} else {
|
receivedCache[matchedTopic] = recvMsg.Id
|
sub.recvCh <- recvMsg
|
}
|
}
|
}
|
}
|
}()
|
|
return sub,nil
|
}
|
|
func matchTopic(msg *Message,subTopics []string) (bool,string) {
|
if subTopics ==nil || len(subTopics) ==0 {
|
return true,msgTopicAll
|
}
|
for _,t := range subTopics {
|
if msg.Topic == t {
|
return true,msg.Topic
|
}
|
}
|
return false,""
|
}
|
|
func (ps *mangosPubSub) Surveyor() []string {
|
return ps.surveyors.ToSlice()
|
}
|
|
func (ps *mangosPubSub) Publish(msg Message) {
|
ps.pubCh <- msg
|
}
|
|
func (ps *mangosPubSub) Recv() chan Message {
|
return ps.recvCh
|
}
|
|
func (ps *mangosPubSub) GetCliInfo() map[string][]byte {
|
if ps.svr != nil {
|
return ps.cliInfo
|
}
|
return nil
|
}
|
|
func (ps *mangosPubSub) SetResp(r []byte) {
|
if ps.cli != nil {
|
ps.cli.SetResp(r)
|
}
|
}
|