zhangmeng
2020-08-26 9119ebea26a3dbf6cf3c5a0bcd87b79c7f0d1cee
add log
1个文件已修改
34 ■■■■■ 已修改文件
library.go 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
library.go
@@ -44,7 +44,6 @@
    ctx context.Context
    wg  *sync.WaitGroup
    // 创建channel对应的reply,等待读取其中的内容,server
    // 其中必须有一个作为Request函数的server
    m map[string]*sockServer
    // 创建reply服务Request函数
    sockRep *sockServer
@@ -55,13 +54,14 @@
    // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理
    sockPub *sockClient
    // 创建订阅的socket
    // 订阅的主题发送的消息
    // 订阅的主题peer发送的消息
    sockSub *sockClient
    // 创建一个万能socket发送给任意server
    sockWorker *sockClient
    // 多线程
    mtxWorker sync.Mutex
    // GetMessge实现需要的缓存
    chSub   chan TransInfo
    chReply chan TransInfo
}
@@ -71,6 +71,7 @@
    routineReply = "reply"
)
// 订阅消息或reply接收的Request请求消息,通过缓存用于GetMessage获取
func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo, id string) {
    for {
        select {
@@ -97,11 +98,11 @@
    }
}
// Register reg
// Register firstly register to manager
func Register(ctx context.Context, info *RegisterInfo) *Handle {
    m := make(map[string]*sockServer)
    // 首先请求一堆key
    // 首先请求一堆key, 包括reply/sub/pub/topic/heartbeat
    sockReg := OpenDgramSocket()
    if sockReg == nil {
        return nil
@@ -131,13 +132,14 @@
        }
    }
    // 得到key,赋值
    // 得到key
    var regReply RegisterInfoReply
    if err := proto.Unmarshal(rdata, &regReply); err != nil {
        return nil
    }
    // 收发req/rep channel, server
    // channels对应的server,都是reply
    for _, v := range info.Channel {
        if k, ok := regReply.ChannelKey[v]; ok {
            s := OpenDgramSocket()
@@ -151,15 +153,15 @@
    wg := &sync.WaitGroup{}
    // 创建sub/reply服务缓存
    chSize := 5
    chSub := make(chan TransInfo, chSize)
    chReply := make(chan TransInfo, chSize)
    // reply使用一个,服务Request, server
    // reply server, 服务Request
    sockReply := OpenDgramSocket()
    sockReply.Bind(int(regReply.ReplyKey))
    // 启动接收线程
    wg.Add(1)
    go recvRoutine(ctx, sockReply, wg, chReply, routineReply)
    repS := &sockServer{
@@ -167,22 +169,24 @@
        info: info.ProcInfo,
    }
    // heartbeat使用一个socket, client
    // heartbeat client, 单独使用一个socket发送心跳
    sockHB := OpenDgramSocket()
    hbC := &sockClient{
        sock: sockHB,
        peer: int(regReply.HeartbeatKey),
    }
    // 发布主题使用一个, client
    sockUp := OpenDgramSocket()
    // pub client, 单独使用一个发布主题
    sockPub := OpenDgramSocket()
    pubC := &sockClient{
        sock: sockUp,
        sock: sockPub,
        peer: int(regReply.UpdateTopicKey),
    }
    // sub使用一个socket, client
    // sub client, 共用一个socket
    sockSub := OpenDgramSocket()
    // sockSub.Bind(int(regReply.SubTopicKey))
    // 订阅主题
    // 订阅所有主题
    for _, v := range info.SubTopic {
        sockSub.Sub(v, int(regReply.SubTopicKey))
    }
@@ -194,7 +198,7 @@
        peer: -1,
    }
    // 万能socket,仅作为客户端使用, 或者获取topic key
    // 万能socket,仅作为客户端使用, 或者获取topic key, 保存topic服务器的key
    sockW := OpenDgramSocket()
    uniC := &sockClient{
        sock: sockW,
@@ -244,7 +248,7 @@
// GetTopicInfo get topic info
func (h *Handle) GetTopicInfo(topic, typ string) int {
    // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key
    // 据说不更新,先用缓存,否则从manager请求key
    // ***k
    if v, ok := h.m[topic]; ok {
        return v.sock.Port()