zhangmeng
2020-08-26 9119ebea26a3dbf6cf3c5a0bcd87b79c7f0d1cee
library.go
@@ -44,7 +44,6 @@
   ctx context.Context
   wg  *sync.WaitGroup
   // 创建channel对应的reply,等待读取其中的内容,server
   // 其中必须有一个作为Request函数的server
   m map[string]*sockServer
   // 创建reply服务Request函数
   sockRep *sockServer
@@ -55,13 +54,14 @@
   // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理
   sockPub *sockClient
   // 创建订阅的socket
   // 订阅的主题发送的消息
   // 订阅的主题peer发送的消息
   sockSub *sockClient
   // 创建一个万能socket发送给任意server
   sockWorker *sockClient
   // 多线程
   mtxWorker sync.Mutex
   // GetMessge实现需要的缓存
   chSub   chan TransInfo
   chReply chan TransInfo
}
@@ -71,6 +71,7 @@
   routineReply = "reply"
)
// 订阅消息或reply接收的Request请求消息,通过缓存用于GetMessage获取
func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo, id string) {
   for {
      select {
@@ -97,11 +98,11 @@
   }
}
// Register reg
// Register firstly register to manager
func Register(ctx context.Context, info *RegisterInfo) *Handle {
   m := make(map[string]*sockServer)
   // 首先请求一堆key
   // 首先请求一堆key, 包括reply/sub/pub/topic/heartbeat
   sockReg := OpenDgramSocket()
   if sockReg == nil {
      return nil
@@ -131,13 +132,14 @@
      }
   }
   // 得到key,赋值
   // 得到key
   var regReply RegisterInfoReply
   if err := proto.Unmarshal(rdata, &regReply); err != nil {
      return nil
   }
   // 收发req/rep channel, server
   // channels对应的server,都是reply
   for _, v := range info.Channel {
      if k, ok := regReply.ChannelKey[v]; ok {
         s := OpenDgramSocket()
@@ -151,15 +153,15 @@
   wg := &sync.WaitGroup{}
   // 创建sub/reply服务缓存
   chSize := 5
   chSub := make(chan TransInfo, chSize)
   chReply := make(chan TransInfo, chSize)
   // reply使用一个,服务Request, server
   // reply server, 服务Request
   sockReply := OpenDgramSocket()
   sockReply.Bind(int(regReply.ReplyKey))
   // 启动接收线程
   wg.Add(1)
   go recvRoutine(ctx, sockReply, wg, chReply, routineReply)
   repS := &sockServer{
@@ -167,22 +169,24 @@
      info: info.ProcInfo,
   }
   // heartbeat使用一个socket, client
   // heartbeat client, 单独使用一个socket发送心跳
   sockHB := OpenDgramSocket()
   hbC := &sockClient{
      sock: sockHB,
      peer: int(regReply.HeartbeatKey),
   }
   // 发布主题使用一个, client
   sockUp := OpenDgramSocket()
   // pub client, 单独使用一个发布主题
   sockPub := OpenDgramSocket()
   pubC := &sockClient{
      sock: sockUp,
      sock: sockPub,
      peer: int(regReply.UpdateTopicKey),
   }
   // sub使用一个socket, client
   // sub client, 共用一个socket
   sockSub := OpenDgramSocket()
   // sockSub.Bind(int(regReply.SubTopicKey))
   // 订阅主题
   // 订阅所有主题
   for _, v := range info.SubTopic {
      sockSub.Sub(v, int(regReply.SubTopicKey))
   }
@@ -194,7 +198,7 @@
      peer: -1,
   }
   // 万能socket,仅作为客户端使用, 或者获取topic key
   // 万能socket,仅作为客户端使用, 或者获取topic key, 保存topic服务器的key
   sockW := OpenDgramSocket()
   uniC := &sockClient{
      sock: sockW,
@@ -244,7 +248,7 @@
// GetTopicInfo get topic info
func (h *Handle) GetTopicInfo(topic, typ string) int {
   // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key
   // 据说不更新,先用缓存,否则从manager请求key
   // ***k
   if v, ok := h.m[topic]; ok {
      return v.sock.Port()