| | |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if data, peer, err := sock.RecvFrom(); err == nil { |
| | | var info *MsgInfo |
| | | if err := proto.Unmarshal(data, info); err == nil { |
| | | if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil { |
| | | var info MsgInfo |
| | | if err := proto.Unmarshal(data, &info); err == nil { |
| | | ch <- TransInfo{ |
| | | info: info, |
| | | info: &info, |
| | | port: peer, |
| | | } |
| | | } |
| | | } else { |
| | | // time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | |
| | | return handle |
| | | } |
| | | |
| | | const ( |
| | | timeoutSec = 1 |
| | | timeoutUsec = 0 |
| | | ) |
| | | |
| | | // GetTopicInfo get topic info |
| | | func (h *Handle) GetTopicInfo(topic, typ string) int { |
| | | // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key |
| | |
| | | } |
| | | if data, err := proto.Marshal(msg); err == nil { |
| | | h.mtxWorker.Lock() |
| | | if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil { |
| | | if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil { |
| | | h.mtxWorker.Unlock() |
| | | var rmsg *TopicInfoReply |
| | | if err := proto.Unmarshal(rdata, rmsg); err == nil { |
| | | var rmsg TopicInfoReply |
| | | if err := proto.Unmarshal(rdata, &rmsg); err == nil { |
| | | return int(rmsg.Key) |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | func (h *Handle) send2(sc *sockClient, data []byte, logID string) error { |
| | | if r := sc.sock.SendTo(data, sc.peer); r != 0 { |
| | | if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 { |
| | | return fmt.Errorf("%s SendTo Failed: %d", logID, r) |
| | | } |
| | | return nil |
| | |
| | | defer h.mtxWorker.Unlock() |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { |
| | | if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { |
| | | return fmt.Errorf("SendOnly Failed: %d", r) |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | // 同步接口,需要等待返回值 |
| | | var ret *MsgInfo |
| | | var ret MsgInfo |
| | | loop: |
| | | for { |
| | | select { |
| | | case <-h.ctx.Done(): |
| | | return nil |
| | | default: |
| | | if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil { |
| | | if err := proto.Unmarshal(data, ret); err == nil { |
| | | if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil { |
| | | if err := proto.Unmarshal(data, &ret); err == nil { |
| | | break loop |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return ret |
| | | return &ret |
| | | } |
| | | |
| | | // Reply request |
| | | func (h *Handle) Reply(key int, info *MsgInfo) error { |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | if r := h.sockRep.sock.SendTo(msg, key); r != 0 { |
| | | if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { |
| | | return fmt.Errorf("Reply Failed: %d", r) |
| | | } |
| | | } |