zhangmeng
2020-07-31 dba48754d9623a49b155e94c65341b773bf4eeef
library.go
@@ -3,6 +3,7 @@
import (
   "context"
   "fmt"
   "sync"
   "time"
   "github.com/golang/protobuf/proto"
@@ -17,12 +18,12 @@
   GetTopicInfoTypeChannel = "getchannel"
)
type subOReply struct {
type sockServer struct {
   sock *DgramSocket
   info *ProcInfo
}
type sockRe struct {
type sockClient struct {
   sock *DgramSocket
   peer int
}
@@ -34,21 +35,31 @@
}
// Handle handler
/*
sockHB/sockPub/sockWorker可以使用一个socket
但是由于需要支持多线程且心跳/发布都是很重要的信息,单独一个socket处理
worker处理短时的发送
*/
type Handle struct {
   ctx context.Context
   // 创建channel对应的reply,等待读取其中的内容,server
   // 其中必须有一个作为Request函数的server
   m map[string]*subOReply
   // 创建心跳连接,client,仅发送心跳信息
   sockHB *sockRe
   // 创建更新主题连接,client,仅发送主题更新信息
   sockUp *sockRe
   // 创建订阅的socket
   sockSub *subOReply
   m map[string]*sockServer
   // 创建reply服务Request函数
   sockRep *sockRe
   sockRep *sockServer
   // 创建心跳连接,client,仅发送心跳信息
   // 心跳需要保证单独的socket发送,如果跟其他共用socket,如果阻塞就无法发送
   sockHB *sockClient
   // 创建更新主题连接,client,仅发送主题更新信息
   // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理
   sockPub *sockClient
   // 创建订阅的socket
   // 订阅的主题发送的消息
   sockSub *sockClient
   // 创建一个万能socket发送给任意server
   sockWorker *sockRe
   sockWorker *sockClient
   // 多线程
   mtxWorker sync.Mutex
   chSub   chan TransInfo
   chReply chan TransInfo
@@ -62,7 +73,7 @@
      v.sock.Close()
   }
   h.sockHB.sock.Close()
   h.sockUp.sock.Close()
   h.sockPub.sock.Close()
   h.sockSub.sock.Close()
   h.sockRep.sock.Close()
   h.sockWorker.sock.Close()
@@ -89,7 +100,7 @@
// Register reg
func Register(ctx context.Context, info *RegisterInfo) *Handle {
   m := make(map[string]*subOReply)
   m := make(map[string]*sockServer)
   // 首先请求一堆key
   sockReg := OpenDgramSocket()
@@ -125,12 +136,12 @@
      return nil
   }
   // 收发req/rep channel
   // 收发req/rep channel, server
   for _, v := range info.Channel {
      if k, ok := regReply.ChannelKey[v]; ok {
         s := OpenDgramSocket()
         s.Bind(int(k))
         m[v] = &subOReply{
         m[v] = &sockServer{
            sock: s,
            info: info.ProcInfo,
         }
@@ -141,20 +152,29 @@
   chSub := make(chan TransInfo, chSize)
   chReply := make(chan TransInfo, chSize)
   // heartbeat使用一个socket
   // reply使用一个,服务Request, server
   sockReply := OpenDgramSocket()
   sockReply.Bind(int(regReply.ReplyKey))
   // 启动接收线程
   go recvRoutine(ctx, sockReply, chSub)
   repS := &sockServer{
      sock: sockReply,
      info: info.ProcInfo,
   }
   // heartbeat使用一个socket, client
   sockHB := OpenDgramSocket()
   hbr := &sockRe{
   hbC := &sockClient{
      sock: sockHB,
      peer: int(regReply.HeartbeatKey),
   }
   // 更新主题使用一个
   // 发布主题使用一个, client
   sockUp := OpenDgramSocket()
   upr := &sockRe{
   pubC := &sockClient{
      sock: sockUp,
      peer: int(regReply.UpdateTopicKey),
   }
   // sub使用一个socket
   // sub使用一个socket, client
   sockSub := OpenDgramSocket()
   // sockSub.Bind(int(regReply.SubTopicKey))
   // 订阅主题
@@ -163,34 +183,25 @@
   }
   // 启动接收线程
   go recvRoutine(ctx, sockSub, chSub)
   sub := &subOReply{
   subC := &sockClient{
      sock: sockSub,
      info: info.ProcInfo,
   }
   // reply使用一个,服务Request
   sockReply := OpenDgramSocket()
   sockReply.Bind(int(regReply.ReplyKey))
   // 启动接收线程
   go recvRoutine(ctx, sockReply, chSub)
   rer := &sockRe{
      sock: sockReply,
      peer: -1,
   }
   // 万能socket,仅作为客户端使用
   sockW := OpenDgramSocket()
   swr := &sockRe{
   uniC := &sockClient{
      sock: sockW,
      peer: -1,
   }
   handle := &Handle{
      ctx:        ctx,
      m:          m,
      sockHB:     hbr,
      sockUp:     upr,
      sockSub:    sub,
      sockRep:    rer,
      sockWorker: swr,
      sockHB:     hbC,
      sockPub:    pubC,
      sockSub:    subC,
      sockRep:    repS,
      sockWorker: uniC,
      chSub:      chSub,
      chReply:    chReply,
   }
@@ -207,11 +218,27 @@
   if v, ok := h.m[topic]; ok {
      return v.sock.Port()
   }
   // 远程获取
   msg := &TopicInfo{
      Topic:     topic,
      TopicType: typ,
   }
   if data, err := proto.Marshal(msg); err == nil {
      h.mtxWorker.Lock()
      if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
         h.mtxWorker.Unlock()
         var rmsg *TopicInfoReply
         if err := proto.Unmarshal(rdata, rmsg); err == nil {
            return int(rmsg.Key)
         }
      }
      h.mtxWorker.Unlock()
   }
   return -1
}
func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
   if r := sr.sock.SendTo(data, sr.peer); r != 0 {
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
   if r := sc.sock.SendTo(data, sc.peer); r != 0 {
      return fmt.Errorf("%s SendTo Failed: %d", logID, r)
   }
   return nil
@@ -228,6 +255,8 @@
// SendOnly no recv
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   msg, err := proto.Marshal(info)
   if err == nil {
      if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
@@ -241,13 +270,16 @@
func (h *Handle) Pub(info *MsgInfo) error {
   msg, err := proto.Marshal(info)
   if err == nil {
      return h.send2(h.sockUp, msg, "Pub")
      return h.send2(h.sockPub, msg, "Pub")
   }
   return err
}
// Request req sync
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
   h.mtxWorker.Lock()
   defer h.mtxWorker.Unlock()
   msg, err := proto.Marshal(info)
   if err != nil {
      return nil