package pubsub import ( "basic.com/valib/gopherdiscovery.git" "encoding/json" "fmt" "time" ) type mangosPubSub struct { url string heartBeatUrl string pubCh chan Message //publish msg chan aliveNodes gopherdiscovery.StringSet recvCh chan Message //recv msg chan } func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) { var discoveryServer *gopherdiscovery.DiscoveryServer var err error var ( defaultOpts = gopherdiscovery.Options{ SurveyTime: 3 * time.Second, //RecvDeadline: 3 * time.Second, PollTime: 5 * time.Second, } ) discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts) fmt.Println("err:",err) pub := &mangosPubSub{ url: publishUrl, heartBeatUrl: heartBeatUrl, aliveNodes: gopherdiscovery.NewStringSet(), pubCh: make(chan Message, 50), } var msgCache = make(map[string]Message) //clientMsgCh := make(map[string]chan Message) cacheNodes := gopherdiscovery.NewStringSet() go func() { for { select { case msg := <-pub.pubCh: msgCache[msg.Topic] = msg //if len(clientMsgCh) > 0 { // for _, ch := range clientMsgCh { // ch <- msg // } //} if cacheNodes.Cardinality() >0 { sendB, _ := json.Marshal(msg) discoveryServer.PublishMsg(string(sendB)) } default: nodeIds := discoveryServer.AliveNodes().ToSlice() if len(nodeIds) >0 { //for _,nodeId := range nodeIds { // if _,ok := clientMsgCh[nodeId]; !ok { // clientMsgCh[nodeId] = make(chan Message) // } //} if cacheNodes.Cardinality() == 0 { //第一次有上线的节点 if len(msgCache) > 0 { for _,cMsg := range msgCache { sendB, _ := json.Marshal(cMsg) discoveryServer.PublishMsg(string(sendB)) } } } cacheNodes = discoveryServer.AliveNodes() } else { time.Sleep(10 * time.Millisecond) } } } }() go func() { }() return pub,nil } func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) { client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId) if err !=nil { return nil,err } heartMsg := client.HeartBeatMsg() _= <-heartMsg fmt.Println("heat beat with server success") sub := &mangosPubSub{ url:subcribeUrl, heartBeatUrl: heartBeatUrl, recvCh: make(chan Message,50), } go func() { peers, _ := client.Peers() for msg := range peers { //判断是否是想要的主题消息 var recvMsg Message if err := json.Unmarshal(msg, &recvMsg);err ==nil { if matchTopic(recvMsg.Topic, topics) { sub.recvCh <- recvMsg } } } }() return sub,nil } func matchTopic(topic string,subTopics []string) bool { if subTopics ==nil && len(subTopics) ==0 { return true } for _,t := range subTopics { if topic == t { return true } } return false } func (ps *mangosPubSub) Publish(msg Message) { ps.pubCh <- msg } func (ps *mangosPubSub) Recv() chan Message { return ps.recvCh }