liuxiaolong
2020-01-14 f51eaaedb99d04d20500e3c7166b122940f2508d
mangos.go
@@ -1,116 +1,112 @@
package pubsub
import (
   "context"
   "basic.com/valib/gopherdiscovery.git"
   "encoding/json"
   "fmt"
   "nanomsg.org/go-mangos"
   "nanomsg.org/go-mangos/protocol/pub"
   "nanomsg.org/go-mangos/protocol/sub"
   "nanomsg.org/go-mangos/transport/ipc"
   "nanomsg.org/go-mangos/transport/tcp"
   "time"
)
type mangosPubSub struct {
   url string
   ctx  context.Context
   sock mangos.Socket
   pubCh chan []byte  //publish msg chan
   url          string
   heartBeatUrl string
   pubCh        chan Message  //publish msg chan
   surveyors    gopherdiscovery.StringSet
   recvCh chan Message  //recv msg chan
}
func newPub(url string) (*mangosPubSub,error) {
   var sock mangos.Socket
func newPub(publishUrl string,heartBeatUrl string) (*mangosPubSub,error) {
   var discoveryServer *gopherdiscovery.DiscoveryServer
   var err error
   var (
      defaultOpts = gopherdiscovery.Options{
         SurveyTime:   3 * time.Second,
         //RecvDeadline: 3 * time.Second,
         PollTime:     5 * time.Second,
      }
   )
   sock, err = pub.NewSocket()
   if err != nil {
      return nil, err
   }
   sock.AddTransport(ipc.NewTransport())
   sock.AddTransport(tcp.NewTransport())
   discoveryServer, err = gopherdiscovery.Server(heartBeatUrl, publishUrl, defaultOpts)
   err = sock.Listen(url)
   if err != nil {
      return nil, err
   }
   ctx, cancel := context.WithCancel(context.Background())
   fmt.Println("newPub err:",err)
   pub := &mangosPubSub{
      url: url,
      ctx: ctx,
      sock: sock,
      pubCh: make(chan []byte),
      url:          publishUrl,
      heartBeatUrl: heartBeatUrl,
      surveyors:    gopherdiscovery.NewStringSet(),
      pubCh:        make(chan Message, 50),
   }
   var msgCache = make(map[string]Message)
   //clientMsgCh := make(map[string]chan Message)
   go func() {
      for {
         select {
         case <-ctx.Done():
            close(pub.pubCh)
            cancel()
            return
         case msg := <-pub.pubCh:
            err := pub.sock.Send(msg)
            if err != nil {
               fmt.Println("Error PUBLISH MSG to the socket:", err.Error())
            msgCache[msg.Topic] = msg
            //if len(clientMsgCh) > 0 {
            //   for _, ch := range clientMsgCh {
            //      ch <- msg
            //   }
            //}
            if pub.surveyors.Cardinality() >0 {
               sendB, _ := json.Marshal(msg)
               discoveryServer.PublishMsg(string(sendB))
            }
         default:
            nodeIds := discoveryServer.AliveNodes()
            //for _,nodeId := range nodeIds {
            //   if _,ok := clientMsgCh[nodeId]; !ok {
            //      clientMsgCh[nodeId] = make(chan Message)
            //   }
            //}
            removedNodes := pub.surveyors.Difference(nodeIds)
            addedNodes := nodeIds.Difference(pub.surveyors)
            if len(nodeIds.ToSlice()) >0 {
               if addedNodes.Cardinality() >0 { //有新节点上线的时候,需要发一次消息,节点离线的时候不用管
                  fmt.Println("removedNodes:", removedNodes, "addedNodes:", addedNodes)
                  if len(msgCache) > 0 {
                     for _,cMsg := range msgCache {
                        sendB, _ := json.Marshal(cMsg)
                        discoveryServer.PublishMsg(string(sendB))
                     }
                  }
               }
               pub.surveyors = nodeIds
            } else {//订阅者全部阵亡
               pub.surveyors = nodeIds
               time.Sleep(10 * time.Millisecond)
            }
         }
      }
   }()
   return pub,nil
}
func newSub(url string, topics []string) (*mangosPubSub,error) {
   var sock mangos.Socket
   var err error
   sock, err = sub.NewSocket()
   if err != nil {
      return nil, err
func newSub(subcribeUrl string,heartBeatUrl string, topics []string,procId string) (*mangosPubSub,error) {
   client, err := gopherdiscovery.ClientWithSub(heartBeatUrl, subcribeUrl, procId)
   if err !=nil {
      return nil,err
   }
   sock.AddTransport(ipc.NewTransport())
   sock.AddTransport(tcp.NewTransport())
   err = sock.Dial(url)
   if err != nil {
      return nil, err
   }
   // subscribes to everything
   err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
   if err != nil {
      return nil, err
   }
   ctx, cancel := context.WithCancel(context.Background())
   heartMsg := client.HeartBeatMsg()
   _= <-heartMsg
   fmt.Println("heat beat with server success")
   sub := &mangosPubSub{
      url:url,
      ctx: ctx,
      sock: sock,
      url:subcribeUrl,
      heartBeatUrl: heartBeatUrl,
      recvCh: make(chan Message,50),
      surveyors: gopherdiscovery.NewStringSet(),
   }
   var msg []byte
   go func() {
      for {
         select {
         case <-ctx.Done():
            close(sub.recvCh)
            cancel()
            return
         default:
            msg, err = sub.sock.Recv()
            if err != nil {
               fmt.Println("Cannot SUBSCRIBE MSG,ERR:", err.Error())
            } else {
               //判断是否是想要的主题消息
               var recvMsg Message
               if unmarshlErr := json.Unmarshal(msg, &recvMsg);unmarshlErr ==nil {
                  if matchTopic(recvMsg.Topic, topics) {
                     sub.recvCh <- recvMsg
                  }
               }
      peers, _ := client.Peers()
      for msg := range peers {
         //判断是否是想要的主题消息
         var recvMsg Message
         if err := json.Unmarshal(msg, &recvMsg);err ==nil {
            if matchTopic(recvMsg.Topic, topics) {
               sub.recvCh <- recvMsg
            }
         }
      }
@@ -131,7 +127,11 @@
   return false
}
func (ps *mangosPubSub) Publish(msg []byte) {
func (ps *mangosPubSub) Surveyor() []string {
   return ps.surveyors.ToSlice()
}
func (ps *mangosPubSub) Publish(msg Message) {
   ps.pubCh <- msg
}