zhangmeng
2020-07-30 1baf50119d7d19b276b132f6837e86b396f186ef
library.go
@@ -1,6 +1,7 @@
package softbus
import (
   "context"
   "fmt"
   "time"
@@ -8,86 +9,193 @@
)
const (
   // HeartbeatKey fixed key for hb to servermanager
   HeartbeatKey = 11
   // RegKey fixed key for hb to servermanager
   RegKey = 12
   // UpKey fixed key for update topic to servermanager
   UpKey = 13
   // GetTopicInfoTypeTopic topic
   GetTopicInfoTypeTopic = "gettopic"
   // GetTopicInfoTypeChannel channel
   GetTopicInfoTypeChannel = "getchannel"
)
type shmKeyAndProcInfo struct {
type subOReply struct {
   sock *DgramSocket
   info *ProcInfo
}
type sockRe struct {
   sock *DgramSocket
   peer int
}
// TransInfo 传输的数据和必要的记录
type TransInfo struct {
   info *MsgInfo
   port int
}
// Handle handler
type Handle struct {
   m          map[string]*shmKeyAndProcInfo
   sockWorker *DgramSocket
   ctx context.Context
   // 创建channel对应的reply,等待读取其中的内容,server
   // 其中必须有一个作为Request函数的server
   m map[string]*subOReply
   // 创建心跳连接,client,仅发送心跳信息
   sockHB *sockRe
   // 创建更新主题连接,client,仅发送主题更新信息
   sockUp *sockRe
   // 创建订阅的socket
   sockSub *subOReply
   // 创建reply服务Request函数
   sockRep *sockRe
   // 创建一个万能socket发送给任意server
   sockWorker *sockRe
   chSub   chan TransInfo
   chReply chan TransInfo
}
func garbageCollect(ctx context.Context, h *Handle) {
   <-ctx.Done()
   for _, v := range h.m {
      v.sock.Close()
   }
   h.sockHB.sock.Close()
   h.sockUp.sock.Close()
   h.sockSub.sock.Close()
   h.sockRep.sock.Close()
   h.sockWorker.sock.Close()
}
func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
   for {
      select {
      case <-ctx.Done():
         return
      default:
         if data, peer, err := sock.RecvFrom(); err == nil {
            var info *MsgInfo
            if err := proto.Unmarshal(data, info); err == nil {
               ch <- TransInfo{
                  info: info,
                  port: peer,
               }
            }
         }
         time.Sleep(50 * time.Millisecond)
      }
   }
}
// Register reg
func Register(info *RegisterInfo) *Handle {
   m := make(map[string]*shmKeyAndProcInfo)
func Register(ctx context.Context, info *RegisterInfo) *Handle {
   m := make(map[string]*subOReply)
   // 首先请求一堆key
   sockReg := OpenDgramSocket()
   if sockReg == nil {
      return nil
   }
   defer sockReg.Close()
   var msg, rdata []byte
   var err error
loop:
   for {
      if msg == nil {
         if msg, err = proto.Marshal(info); err != nil {
            time.Sleep(100 * time.Millisecond)
            continue
      select {
      case <-ctx.Done():
         return nil
      default:
         if msg == nil {
            if msg, err = proto.Marshal(info); err != nil {
               time.Sleep(100 * time.Millisecond)
               continue
            }
         }
         if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
            break loop
         }
         time.Sleep(100 * time.Millisecond)
      }
   }
   // 得到key,赋值
   var regReply RegisterInfoReply
   if err := proto.Unmarshal(rdata, &regReply); err != nil {
      return nil
   }
   // 收发req/rep channel
   for _, v := range info.Channel {
      if k, ok := regReply.ChannelKey[v]; ok {
         s := OpenDgramSocket()
         s.Bind(int(k))
         m[v] = &subOReply{
            sock: s,
            info: info.ProcInfo,
         }
      }
      if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
         break
      }
      time.Sleep(100 * time.Millisecond)
   }
   sockReg.Close()
   // 得到key,赋值
   if rdata != nil {
   }
   // 只收不发
   for _, v := range info.Channel {
      s := OpenDgramSocket()
      m[v] = &shmKeyAndProcInfo{
         sock: s,
         info: info.ProcInfo,
      }
   }
   // pub/sub使用同一个socket
   pbs := OpenDgramSocket()
   for _, v := range info.PubTopic {
      m[v] = &shmKeyAndProcInfo{
         sock: pbs,
         info: info.ProcInfo,
      }
   chSize := 5
   chSub := make(chan TransInfo, chSize)
   chReply := make(chan TransInfo, chSize)
   // heartbeat使用一个socket
   sockHB := OpenDgramSocket()
   hbr := &sockRe{
      sock: sockHB,
      peer: int(regReply.HeartbeatKey),
   }
   for _, v := range info.SubTopic {
      m[v] = &shmKeyAndProcInfo{
         sock: pbs,
         info: info.ProcInfo,
      }
   // 更新主题使用一个
   sockUp := OpenDgramSocket()
   upr := &sockRe{
      sock: sockUp,
      peer: int(regReply.UpdateTopicKey),
   }
   s := OpenDgramSocket()
   return &Handle{
   // sub使用一个socket
   sockSub := OpenDgramSocket()
   sockSub.Bind(int(regReply.SubTopicKey))
   // 启动接收线程
   go recvRoutine(ctx, sockSub, chSub)
   sub := &subOReply{
      sock: sockSub,
      info: info.ProcInfo,
   }
   // reply使用一个,服务Request
   sockReply := OpenDgramSocket()
   sockReply.Bind(int(regReply.ReplyKey))
   // 启动接收线程
   go recvRoutine(ctx, sockReply, chSub)
   rer := &sockRe{
      sock: sockReply,
      peer: -1,
   }
   // 万能socket,仅作为客户端使用
   sockW := OpenDgramSocket()
   swr := &sockRe{
      sock: sockW,
      peer: -1,
   }
   handle := &Handle{
      ctx:        ctx,
      m:          m,
      sockWorker: s,
      sockHB:     hbr,
      sockUp:     upr,
      sockSub:    sub,
      sockRep:    rer,
      sockWorker: swr,
      chSub:      chSub,
      chReply:    chReply,
   }
   go garbageCollect(ctx, handle)
   return handle
}
// GetTopicInfo get topic info
@@ -98,8 +206,8 @@
   return -1
}
func (h *Handle) send2(data []byte, key int, logID string) error {
   if r := h.sockWorker.SendTo(data, key); r != 0 {
func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
   if r := sr.sock.SendTo(data, sr.peer); r != 0 {
      return fmt.Errorf("%s SendTo Failed: %d", logID, r)
   }
   return nil
@@ -109,7 +217,7 @@
func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
   msg, err := proto.Marshal(info)
   if err == nil {
      return h.send2(msg, HeartbeatKey, "HeartBeat")
      return h.send2(h.sockHB, msg, "HeartBeat")
   }
   return err
}
@@ -118,13 +226,70 @@
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
   msg, err := proto.Marshal(info)
   if err == nil {
      return h.send2(msg, key, "SendOnly/Pub")
      if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
         return fmt.Errorf("SendOnly Failed: %d", r)
      }
   }
   return err
}
// Pub func
func (h *Handle) Pub(info *MsgInfo) error {
   // return h.SendOnly(PubKey, info)
   return nil
   msg, err := proto.Marshal(info)
   if err == nil {
      return h.send2(h.sockUp, msg, "Pub")
   }
   return err
}
// Request req sync
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
   msg, err := proto.Marshal(info)
   if err != nil {
      return nil
   }
   // 同步接口,需要等待返回值
   var ret *MsgInfo
loop:
   for {
      select {
      case <-h.ctx.Done():
         return nil
      default:
         if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
            if err := proto.Unmarshal(data, ret); err == nil {
               break loop
            }
         }
         time.Sleep(100 * time.Millisecond)
      }
   }
   return ret
}
// Reply request
func (h *Handle) Reply(key int, info *MsgInfo) error {
   msg, err := proto.Marshal(info)
   if err == nil {
      if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
         return fmt.Errorf("Reply Failed: %d", r)
      }
   }
   return err
}
// GetMesg get mesg for sub or reply
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
   if len(h.chSub) > 1 {
      m := <-h.chSub
      subMsg = m.info
   }
   if len(h.chReply) > 1 {
      m := <-h.chReply
      replyMsg = m.info
      replyKey = m.port
   }
   return
}