package pubsub import ( "context" "encoding/json" "fmt" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/pub" "nanomsg.org/go-mangos/protocol/sub" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) type mangosPubSub struct { url string ctx context.Context sock mangos.Socket pubCh chan []byte //publish msg chan recvCh chan Message //recv msg chan } func newPub(url string) (*mangosPubSub,error) { var sock mangos.Socket var err error sock, err = pub.NewSocket() if err != nil { return nil, err } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) err = sock.Listen(url) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) pub := &mangosPubSub{ url: url, ctx: ctx, sock: sock, pubCh: make(chan []byte), } go func() { for { select { case <-ctx.Done(): close(pub.pubCh) cancel() return case msg := <-pub.pubCh: err := pub.sock.Send(msg) if err != nil { fmt.Println("Error PUBLISH MSG to the socket:", err.Error()) } } } }() return pub,nil } func newSub(url string, topics []string) (*mangosPubSub,error) { var sock mangos.Socket var err error sock, err = sub.NewSocket() if err != nil { return nil, err } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) err = sock.Dial(url) if err != nil { return nil, err } // subscribes to everything err = sock.SetOption(mangos.OptionSubscribe, []byte("")) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) sub := &mangosPubSub{ url:url, ctx: ctx, sock: sock, recvCh: make(chan Message,50), } var msg []byte go func() { for { select { case <-ctx.Done(): close(sub.recvCh) cancel() return default: msg, err = sub.sock.Recv() if err != nil { fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error()) } else { //判断是否是想要的主题消息 var recvMsg Message if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil { if matchTopic(recvMsg.Topic, topics) { sub.recvCh <- recvMsg } } } } } }() return sub,nil } func matchTopic(topic string,subTopics []string) bool { if subTopics ==nil && len(subTopics) ==0 { return true } for _,t := range subTopics { if topic == t { return true } } return false } func (ps *mangosPubSub) Publish(msg []byte) { ps.pubCh <- msg } func (ps *mangosPubSub) Recv() chan Message { return ps.recvCh }