package pubsub
|
|
import (
|
"context"
|
"encoding/json"
|
"fmt"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/pub"
|
"nanomsg.org/go-mangos/protocol/sub"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
type mangosPubSub struct {
|
url string
|
|
ctx context.Context
|
|
sock mangos.Socket
|
|
pubCh chan []byte //publish msg chan
|
|
recvCh chan Message //recv msg chan
|
}
|
|
func newPub(url string) (*mangosPubSub,error) {
|
var sock mangos.Socket
|
var err error
|
|
sock, err = pub.NewSocket()
|
if err != nil {
|
return nil, err
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
|
err = sock.Listen(url)
|
if err != nil {
|
return nil, err
|
}
|
ctx, cancel := context.WithCancel(context.Background())
|
pub := &mangosPubSub{
|
url: url,
|
ctx: ctx,
|
sock: sock,
|
pubCh: make(chan []byte),
|
}
|
go func() {
|
for {
|
select {
|
case <-ctx.Done():
|
close(pub.pubCh)
|
cancel()
|
return
|
case msg := <-pub.pubCh:
|
err := pub.sock.Send(msg)
|
if err != nil {
|
fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
|
}
|
}
|
}
|
}()
|
return pub,nil
|
}
|
|
func newSub(url string, topics []string) (*mangosPubSub,error) {
|
var sock mangos.Socket
|
var err error
|
|
sock, err = sub.NewSocket()
|
if err != nil {
|
return nil, err
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
|
err = sock.Dial(url)
|
if err != nil {
|
return nil, err
|
}
|
// subscribes to everything
|
err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
|
if err != nil {
|
return nil, err
|
}
|
ctx, cancel := context.WithCancel(context.Background())
|
sub := &mangosPubSub{
|
url:url,
|
ctx: ctx,
|
sock: sock,
|
recvCh: make(chan Message,50),
|
}
|
|
var msg []byte
|
go func() {
|
for {
|
select {
|
case <-ctx.Done():
|
close(sub.recvCh)
|
cancel()
|
return
|
default:
|
msg, err = sub.sock.Recv()
|
if err != nil {
|
fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
|
} else {
|
//判断是否是想要的主题消息
|
var recvMsg Message
|
if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
|
if matchTopic(recvMsg.Topic, topics) {
|
sub.recvCh <- recvMsg
|
}
|
}
|
}
|
}
|
}
|
}()
|
|
return sub,nil
|
}
|
|
func matchTopic(topic string,subTopics []string) bool {
|
if subTopics ==nil && len(subTopics) ==0 {
|
return true
|
}
|
for _,t := range subTopics {
|
if topic == t {
|
return true
|
}
|
}
|
return false
|
}
|
|
func (ps *mangosPubSub) Publish(msg []byte) {
|
ps.pubCh <- msg
|
}
|
|
func (ps *mangosPubSub) Recv() chan Message {
|
return ps.recvCh
|
}
|