zhangmeng
2020-08-03 0bde715af72b7b3d55ad3aac816d7cd153a60b42
library.go
@@ -85,14 +85,16 @@
      case <-ctx.Done():
         return
      default:
         if data, peer, err := sock.RecvFrom(); err == nil {
            var info *MsgInfo
            if err := proto.Unmarshal(data, info); err == nil {
         if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
            var info MsgInfo
            if err := proto.Unmarshal(data, &info); err == nil {
               ch <- TransInfo{
                  info: info,
                  info: &info,
                  port: peer,
               }
            }
         } else {
            // time.Sleep(10 * time.Millisecond)
         }
      }
   }
@@ -211,6 +213,11 @@
   return handle
}
const (
   timeoutSec  = 1
   timeoutUsec = 0
)
// GetTopicInfo get topic info
func (h *Handle) GetTopicInfo(topic, typ string) int {
   // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key
@@ -225,10 +232,10 @@
   }
   if data, err := proto.Marshal(msg); err == nil {
      h.mtxWorker.Lock()
      if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
      if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil {
         h.mtxWorker.Unlock()
         var rmsg *TopicInfoReply
         if err := proto.Unmarshal(rdata, rmsg); err == nil {
         var rmsg TopicInfoReply
         if err := proto.Unmarshal(rdata, &rmsg); err == nil {
            return int(rmsg.Key)
         }
      }
@@ -238,7 +245,7 @@
}
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
   if r := sc.sock.SendTo(data, sc.peer); r != 0 {
   if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 {
      return fmt.Errorf("%s SendTo Failed: %d", logID, r)
   }
   return nil
@@ -259,7 +266,7 @@
   defer h.mtxWorker.Unlock()
   msg, err := proto.Marshal(info)
   if err == nil {
      if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
      if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
         return fmt.Errorf("SendOnly Failed: %d", r)
      }
   }
@@ -286,28 +293,28 @@
   }
   // 同步接口,需要等待返回值
   var ret *MsgInfo
   var ret MsgInfo
loop:
   for {
      select {
      case <-h.ctx.Done():
         return nil
      default:
         if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
            if err := proto.Unmarshal(data, ret); err == nil {
         if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
            if err := proto.Unmarshal(data, &ret); err == nil {
               break loop
            }
         }
      }
   }
   return ret
   return &ret
}
// Reply request
func (h *Handle) Reply(key int, info *MsgInfo) error {
   msg, err := proto.Marshal(info)
   if err == nil {
      if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
      if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
         return fmt.Errorf("Reply Failed: %d", r)
      }
   }