| | |
| | | package pubsub |
| | | |
| | | import ( |
| | | "context" |
| | | "basic.com/valib/gopherdiscovery.git" |
| | | "encoding/json" |
| | | "fmt" |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/pub" |
| | | "nanomsg.org/go-mangos/protocol/sub" |
| | | "nanomsg.org/go-mangos/transport/ipc" |
| | | "nanomsg.org/go-mangos/transport/tcp" |
| | | "time" |
| | | ) |
| | | |
| | | type mangosPubSub struct { |
| | | url string |
| | | |
| | | ctx context.Context |
| | | |
| | | sock mangos.Socket |
| | | |
| | | pubCh chan []byte //publish msg chan |
| | | heartBeatUrl string |
| | | pubCh chan Message //publish msg chan |
| | | aliveNodes gopherdiscovery.StringSet |
| | | |
| | | recvCh chan Message //recv msg chan |
| | | } |
| | | |
| | | func newPub(url string) (*mangosPubSub,error) { |
| | | var sock mangos.Socket |
| | | |
| | | func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) { |
| | | var discoveryServer *gopherdiscovery.DiscoveryServer |
| | | var err error |
| | | var ( |
| | | defaultOpts = gopherdiscovery.Options{ |
| | | SurveyTime: 3 * time.Second, |
| | | //RecvDeadline: 3 * time.Second, |
| | | PollTime: 5 * time.Second, |
| | | } |
| | | ) |
| | | |
| | | sock, err = pub.NewSocket() |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts) |
| | | |
| | | err = sock.Listen(url) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | fmt.Println("err:",err) |
| | | pub := &mangosPubSub{ |
| | | url: url, |
| | | ctx: ctx, |
| | | sock: sock, |
| | | pubCh: make(chan []byte), |
| | | url: publishUrl, |
| | | heartBeatUrl: heartBeatUrl, |
| | | aliveNodes: gopherdiscovery.NewStringSet(), |
| | | pubCh: make(chan Message, 50), |
| | | } |
| | | var msgCache = make(map[string]Message) |
| | | //clientMsgCh := make(map[string]chan Message) |
| | | cacheNodes := gopherdiscovery.NewStringSet() |
| | | go func() { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | close(pub.pubCh) |
| | | cancel() |
| | | return |
| | | case msg := <-pub.pubCh: |
| | | err := pub.sock.Send(msg) |
| | | if err != nil { |
| | | fmt.Println("Error PUBLISH MSG to the socket:", err.Error()) |
| | | msgCache[msg.Topic] = msg |
| | | //if len(clientMsgCh) > 0 { |
| | | // for _, ch := range clientMsgCh { |
| | | // ch <- msg |
| | | // } |
| | | //} |
| | | if cacheNodes.Cardinality() >0 { |
| | | sendB, _ := json.Marshal(msg) |
| | | discoveryServer.PublishMsg(string(sendB)) |
| | | } |
| | | default: |
| | | nodeIds := discoveryServer.AliveNodes().ToSlice() |
| | | if len(nodeIds) >0 { |
| | | //for _,nodeId := range nodeIds { |
| | | // if _,ok := clientMsgCh[nodeId]; !ok { |
| | | // clientMsgCh[nodeId] = make(chan Message) |
| | | // } |
| | | //} |
| | | |
| | | if cacheNodes.Cardinality() == 0 { //第一次有上线的节点 |
| | | if len(msgCache) > 0 { |
| | | for _,cMsg := range msgCache { |
| | | sendB, _ := json.Marshal(cMsg) |
| | | discoveryServer.PublishMsg(string(sendB)) |
| | | } |
| | | } |
| | | } |
| | | cacheNodes = discoveryServer.AliveNodes() |
| | | } else { |
| | | time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | }() |
| | | go func() { |
| | | |
| | | }() |
| | | return pub,nil |
| | | } |
| | | |
| | | func newSub(url string, topics []string) (*mangosPubSub,error) { |
| | | var sock mangos.Socket |
| | | var err error |
| | | |
| | | sock, err = sub.NewSocket() |
| | | if err != nil { |
| | | return nil, err |
| | | func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) { |
| | | client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId) |
| | | if err !=nil { |
| | | return nil,err |
| | | } |
| | | sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | |
| | | err = sock.Dial(url) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | // subscribes to everything |
| | | err = sock.SetOption(mangos.OptionSubscribe, []byte("")) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | heartMsg := client.HeartBeatMsg() |
| | | _= <-heartMsg |
| | | fmt.Println("heat beat with server success") |
| | | sub := &mangosPubSub{ |
| | | url:url, |
| | | ctx: ctx, |
| | | sock: sock, |
| | | url:subcribeUrl, |
| | | heartBeatUrl: heartBeatUrl, |
| | | recvCh: make(chan Message,50), |
| | | } |
| | | |
| | | var msg []byte |
| | | go func() { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | close(sub.recvCh) |
| | | cancel() |
| | | return |
| | | default: |
| | | msg, err = sub.sock.Recv() |
| | | if err != nil { |
| | | fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error()) |
| | | } else { |
| | | //判断是否是想要的主题消息 |
| | | var recvMsg Message |
| | | if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil { |
| | | if matchTopic(recvMsg.Topic, topics) { |
| | | sub.recvCh <- recvMsg |
| | | } |
| | | } |
| | | peers, _ := client.Peers() |
| | | for msg := range peers { |
| | | //判断是否是想要的主题消息 |
| | | var recvMsg Message |
| | | if err := json.Unmarshal(msg, &recvMsg);err ==nil { |
| | | if matchTopic(recvMsg.Topic, topics) { |
| | | sub.recvCh <- recvMsg |
| | | } |
| | | } |
| | | } |
| | |
| | | return false |
| | | } |
| | | |
| | | func (ps *mangosPubSub) Publish(msg []byte) { |
| | | func (ps *mangosPubSub) Publish(msg Message) { |
| | | ps.pubCh <- msg |
| | | } |
| | | |