| | |
| | | ctx context.Context |
| | | wg *sync.WaitGroup |
| | | // 创建channel对应的reply,等待读取其中的内容,server |
| | | // 其中必须有一个作为Request函数的server |
| | | m map[string]*sockServer |
| | | // 创建reply服务Request函数 |
| | | sockRep *sockServer |
| | |
| | | // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理 |
| | | sockPub *sockClient |
| | | // 创建订阅的socket |
| | | // 订阅的主题发送的消息 |
| | | // 订阅的主题peer发送的消息 |
| | | sockSub *sockClient |
| | | // 创建一个万能socket发送给任意server |
| | | sockWorker *sockClient |
| | | // 多线程 |
| | | mtxWorker sync.Mutex |
| | | |
| | | // GetMessge实现需要的缓存 |
| | | chSub chan TransInfo |
| | | chReply chan TransInfo |
| | | } |
| | |
| | | routineReply = "reply" |
| | | ) |
| | | |
| | | // 订阅消息或reply接收的Request请求消息,通过缓存用于GetMessage获取 |
| | | func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo, id string) { |
| | | for { |
| | | select { |
| | |
| | | } |
| | | } |
| | | |
| | | // Register reg |
| | | // Register firstly register to manager |
| | | func Register(ctx context.Context, info *RegisterInfo) *Handle { |
| | | m := make(map[string]*sockServer) |
| | | |
| | | // 首先请求一堆key |
| | | // 首先请求一堆key, 包括reply/sub/pub/topic/heartbeat |
| | | sockReg := OpenDgramSocket() |
| | | if sockReg == nil { |
| | | return nil |
| | |
| | | } |
| | | } |
| | | |
| | | // 得到key,赋值 |
| | | // 得到key |
| | | var regReply RegisterInfoReply |
| | | if err := proto.Unmarshal(rdata, ®Reply); err != nil { |
| | | return nil |
| | | } |
| | | |
| | | // 收发req/rep channel, server |
| | | // channels对应的server,都是reply |
| | | for _, v := range info.Channel { |
| | | if k, ok := regReply.ChannelKey[v]; ok { |
| | | s := OpenDgramSocket() |
| | |
| | | |
| | | wg := &sync.WaitGroup{} |
| | | |
| | | // 创建sub/reply服务缓存 |
| | | chSize := 5 |
| | | chSub := make(chan TransInfo, chSize) |
| | | chReply := make(chan TransInfo, chSize) |
| | | |
| | | // reply使用一个,服务Request, server |
| | | // reply server, 服务Request |
| | | sockReply := OpenDgramSocket() |
| | | sockReply.Bind(int(regReply.ReplyKey)) |
| | | // 启动接收线程 |
| | | |
| | | wg.Add(1) |
| | | go recvRoutine(ctx, sockReply, wg, chReply, routineReply) |
| | | repS := &sockServer{ |
| | |
| | | info: info.ProcInfo, |
| | | } |
| | | |
| | | // heartbeat使用一个socket, client |
| | | // heartbeat client, 单独使用一个socket发送心跳 |
| | | sockHB := OpenDgramSocket() |
| | | hbC := &sockClient{ |
| | | sock: sockHB, |
| | | peer: int(regReply.HeartbeatKey), |
| | | } |
| | | // 发布主题使用一个, client |
| | | sockUp := OpenDgramSocket() |
| | | |
| | | // pub client, 单独使用一个发布主题 |
| | | sockPub := OpenDgramSocket() |
| | | pubC := &sockClient{ |
| | | sock: sockUp, |
| | | sock: sockPub, |
| | | peer: int(regReply.UpdateTopicKey), |
| | | } |
| | | // sub使用一个socket, client |
| | | |
| | | // sub client, 共用一个socket |
| | | sockSub := OpenDgramSocket() |
| | | // sockSub.Bind(int(regReply.SubTopicKey)) |
| | | // 订阅主题 |
| | | // 订阅所有主题 |
| | | for _, v := range info.SubTopic { |
| | | sockSub.Sub(v, int(regReply.SubTopicKey)) |
| | | } |
| | |
| | | peer: -1, |
| | | } |
| | | |
| | | // 万能socket,仅作为客户端使用, 或者获取topic key |
| | | // 万能socket,仅作为客户端使用, 或者获取topic key, 保存topic服务器的key |
| | | sockW := OpenDgramSocket() |
| | | uniC := &sockClient{ |
| | | sock: sockW, |
| | |
| | | |
| | | // GetTopicInfo get topic info |
| | | func (h *Handle) GetTopicInfo(topic, typ string) int { |
| | | // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key |
| | | // 据说不更新,先用缓存,否则从manager请求key |
| | | // ***k |
| | | if v, ok := h.m[topic]; ok { |
| | | return v.sock.Port() |