package pubsub import ( "basic.com/valib/gopherdiscovery.git" "encoding/json" "fmt" "time" ) type mangosPubSub struct { url string heartBeatUrl string pubCh chan Message //publish msg chan surveyors gopherdiscovery.StringSet svr *gopherdiscovery.DiscoveryServer cli *gopherdiscovery.DiscoveryClient cliInfo map[string][]byte recvCh chan Message //recv msg chan } func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) { var discoveryServer *gopherdiscovery.DiscoveryServer var err error var ( defaultOpts = gopherdiscovery.Options{ SurveyTime: 1500 * time.Millisecond, PollTime: 2 * time.Second, } ) discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts) fmt.Println("newPub err:",err) pub := &mangosPubSub{ url: publishUrl, heartBeatUrl: heartBeatUrl, surveyors: gopherdiscovery.NewStringSet(), pubCh: make(chan Message, 50), svr: discoveryServer, } var msgCache = make(map[string]Message) //clientMsgCh := make(map[string]chan Message) go func() { for { select { case msg := <-pub.pubCh: msgCache[msg.Topic] = msg //if len(clientMsgCh) > 0 { // for _, ch := range clientMsgCh { // ch <- msg // } //} fmt.Println("<-pub.pubCh,pub.surveyors.Len:",pub.surveyors.Cardinality()) if pub.surveyors.Cardinality() >0 { sendB, _ := json.Marshal(msg) discoveryServer.PublishMsg(string(sendB)) } default: nodeIds := discoveryServer.AliveNodes() //for _,nodeId := range nodeIds { // if _,ok := clientMsgCh[nodeId]; !ok { // clientMsgCh[nodeId] = make(chan Message) // } //} removedNodes := pub.surveyors.Difference(nodeIds) addedNodes := nodeIds.Difference(pub.surveyors) if addedNodes.Cardinality() >0 { //有新节点上线的时候,需要发一次消息,节点离线的时候不用管 fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes) if len(msgCache) > 0 { for _,cMsg := range msgCache { sendB, _ := json.Marshal(cMsg) discoveryServer.PublishMsg(string(sendB)) } } } pub.surveyors = nodeIds pub.cliInfo = discoveryServer.SvInfo() time.Sleep(10 * time.Millisecond) } } }() return pub,nil } const ( msgTopicAll = "sub-msg-all-topic" ) func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) { client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId) if err !=nil { return nil,err } heartMsg := client.HeartBeatMsg() _= <-heartMsg fmt.Println("heat beat with server success") sub := &mangosPubSub{ url:subcribeUrl, heartBeatUrl: heartBeatUrl, recvCh: make(chan Message,50), surveyors: gopherdiscovery.NewStringSet(), cli: client, } var receivedCache = make(map[string]string) go func() { peers, _ := client.Peers() for msg := range peers { //判断是否是想要的主题消息 var recvMsg Message if err := json.Unmarshal(msg, &recvMsg);err ==nil { if b,matchedTopic := matchTopic(&recvMsg, topics);b { if lastMsgId,ok := receivedCache[matchedTopic];ok { if lastMsgId != recvMsg.Id { receivedCache[matchedTopic] = recvMsg.Id sub.recvCh <- recvMsg } } else { receivedCache[matchedTopic] = recvMsg.Id sub.recvCh <- recvMsg } } } } }() return sub,nil } func matchTopic(msg *Message,subTopics []string) (bool,string) { if subTopics ==nil || len(subTopics) ==0 { return true,msgTopicAll } for _,t := range subTopics { if msg.Topic == t { return true,msg.Topic } } return false,"" } func (ps *mangosPubSub) Surveyor() []string { return ps.surveyors.ToSlice() } func (ps *mangosPubSub) Publish(msg Message) { ps.pubCh <- msg } func (ps *mangosPubSub) Recv() chan Message { return ps.recvCh } func (ps *mangosPubSub) GetCliInfo() map[string][]byte { if ps.svr != nil { return ps.cliInfo } return nil } func (ps *mangosPubSub) SetResp(r []byte) { if ps.cli != nil { ps.cli.SetResp(r) } }