| | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "sync" |
| | | "time" |
| | | |
| | | "github.com/golang/protobuf/proto" |
| | |
| | | GetTopicInfoTypeChannel = "getchannel" |
| | | ) |
| | | |
| | | type subOReply struct { |
| | | type sockServer struct { |
| | | sock *DgramSocket |
| | | info *ProcInfo |
| | | } |
| | | |
| | | type sockRe struct { |
| | | type sockClient struct { |
| | | sock *DgramSocket |
| | | peer int |
| | | } |
| | |
| | | } |
| | | |
| | | // Handle handler |
| | | /* |
| | | sockHB/sockPub/sockWorker可以使用一个socket |
| | | 但是由于需要支持多线程且心跳/发布都是很重要的信息,单独一个socket处理 |
| | | worker处理短时的发送 |
| | | */ |
| | | type Handle struct { |
| | | ctx context.Context |
| | | // 创建channel对应的reply,等待读取其中的内容,server |
| | | // 其中必须有一个作为Request函数的server |
| | | m map[string]*subOReply |
| | | // 创建心跳连接,client,仅发送心跳信息 |
| | | sockHB *sockRe |
| | | // 创建更新主题连接,client,仅发送主题更新信息 |
| | | sockUp *sockRe |
| | | // 创建订阅的socket |
| | | sockSub *subOReply |
| | | m map[string]*sockServer |
| | | // 创建reply服务Request函数 |
| | | sockRep *sockRe |
| | | sockRep *sockServer |
| | | // 创建心跳连接,client,仅发送心跳信息 |
| | | // 心跳需要保证单独的socket发送,如果跟其他共用socket,如果阻塞就无法发送 |
| | | sockHB *sockClient |
| | | // 创建更新主题连接,client,仅发送主题更新信息 |
| | | // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理 |
| | | sockPub *sockClient |
| | | // 创建订阅的socket |
| | | // 订阅的主题发送的消息 |
| | | sockSub *sockClient |
| | | // 创建一个万能socket发送给任意server |
| | | sockWorker *sockRe |
| | | sockWorker *sockClient |
| | | // 多线程 |
| | | mtxWorker sync.Mutex |
| | | |
| | | chSub chan TransInfo |
| | | chReply chan TransInfo |
| | |
| | | v.sock.Close() |
| | | } |
| | | h.sockHB.sock.Close() |
| | | h.sockUp.sock.Close() |
| | | h.sockPub.sock.Close() |
| | | h.sockSub.sock.Close() |
| | | h.sockRep.sock.Close() |
| | | h.sockWorker.sock.Close() |
| | |
| | | |
| | | // Register reg |
| | | func Register(ctx context.Context, info *RegisterInfo) *Handle { |
| | | m := make(map[string]*subOReply) |
| | | m := make(map[string]*sockServer) |
| | | |
| | | // 首先请求一堆key |
| | | sockReg := OpenDgramSocket() |
| | |
| | | return nil |
| | | } |
| | | |
| | | // 收发req/rep channel |
| | | // 收发req/rep channel, server |
| | | for _, v := range info.Channel { |
| | | if k, ok := regReply.ChannelKey[v]; ok { |
| | | s := OpenDgramSocket() |
| | | s.Bind(int(k)) |
| | | m[v] = &subOReply{ |
| | | m[v] = &sockServer{ |
| | | sock: s, |
| | | info: info.ProcInfo, |
| | | } |
| | |
| | | chSub := make(chan TransInfo, chSize) |
| | | chReply := make(chan TransInfo, chSize) |
| | | |
| | | // heartbeat使用一个socket |
| | | // reply使用一个,服务Request, server |
| | | sockReply := OpenDgramSocket() |
| | | sockReply.Bind(int(regReply.ReplyKey)) |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockReply, chSub) |
| | | repS := &sockServer{ |
| | | sock: sockReply, |
| | | info: info.ProcInfo, |
| | | } |
| | | |
| | | // heartbeat使用一个socket, client |
| | | sockHB := OpenDgramSocket() |
| | | hbr := &sockRe{ |
| | | hbC := &sockClient{ |
| | | sock: sockHB, |
| | | peer: int(regReply.HeartbeatKey), |
| | | } |
| | | // 更新主题使用一个 |
| | | // 发布主题使用一个, client |
| | | sockUp := OpenDgramSocket() |
| | | upr := &sockRe{ |
| | | pubC := &sockClient{ |
| | | sock: sockUp, |
| | | peer: int(regReply.UpdateTopicKey), |
| | | } |
| | | |
| | | // sub使用一个socket |
| | | // sub使用一个socket, client |
| | | sockSub := OpenDgramSocket() |
| | | // sockSub.Bind(int(regReply.SubTopicKey)) |
| | | // 订阅主题 |
| | |
| | | } |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockSub, chSub) |
| | | sub := &subOReply{ |
| | | subC := &sockClient{ |
| | | sock: sockSub, |
| | | info: info.ProcInfo, |
| | | } |
| | | // reply使用一个,服务Request |
| | | sockReply := OpenDgramSocket() |
| | | sockReply.Bind(int(regReply.ReplyKey)) |
| | | // 启动接收线程 |
| | | go recvRoutine(ctx, sockReply, chSub) |
| | | rer := &sockRe{ |
| | | sock: sockReply, |
| | | peer: -1, |
| | | } |
| | | |
| | | // 万能socket,仅作为客户端使用 |
| | | sockW := OpenDgramSocket() |
| | | swr := &sockRe{ |
| | | uniC := &sockClient{ |
| | | sock: sockW, |
| | | peer: -1, |
| | | } |
| | | handle := &Handle{ |
| | | ctx: ctx, |
| | | m: m, |
| | | sockHB: hbr, |
| | | sockUp: upr, |
| | | sockSub: sub, |
| | | sockRep: rer, |
| | | sockWorker: swr, |
| | | sockHB: hbC, |
| | | sockPub: pubC, |
| | | sockSub: subC, |
| | | sockRep: repS, |
| | | sockWorker: uniC, |
| | | chSub: chSub, |
| | | chReply: chReply, |
| | | } |
| | |
| | | if v, ok := h.m[topic]; ok { |
| | | return v.sock.Port() |
| | | } |
| | | // 远程获取 |
| | | msg := &TopicInfo{ |
| | | Topic: topic, |
| | | TopicType: typ, |
| | | } |
| | | if data, err := proto.Marshal(msg); err == nil { |
| | | h.mtxWorker.Lock() |
| | | if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil { |
| | | h.mtxWorker.Unlock() |
| | | var rmsg *TopicInfoReply |
| | | if err := proto.Unmarshal(rdata, rmsg); err == nil { |
| | | return int(rmsg.Key) |
| | | } |
| | | } |
| | | h.mtxWorker.Unlock() |
| | | } |
| | | return -1 |
| | | } |
| | | |
| | | func (h *Handle) send2(sr *sockRe, data []byte, logID string) error { |
| | | if r := sr.sock.SendTo(data, sr.peer); r != 0 { |
| | | func (h *Handle) send2(sc *sockClient, data []byte, logID string) error { |
| | | if r := sc.sock.SendTo(data, sc.peer); r != 0 { |
| | | return fmt.Errorf("%s SendTo Failed: %d", logID, r) |
| | | } |
| | | return nil |
| | |
| | | |
| | | // SendOnly no recv |
| | | func (h *Handle) SendOnly(key int, info *MsgInfo) error { |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | if r := h.sockWorker.sock.SendTo(msg, key); r != 0 { |
| | |
| | | func (h *Handle) Pub(info *MsgInfo) error { |
| | | msg, err := proto.Marshal(info) |
| | | if err == nil { |
| | | return h.send2(h.sockUp, msg, "Pub") |
| | | return h.send2(h.sockPub, msg, "Pub") |
| | | } |
| | | return err |
| | | } |
| | | |
| | | // Request req sync |
| | | func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo { |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | |
| | | msg, err := proto.Marshal(info) |
| | | if err != nil { |
| | | return nil |