zhangmeng
2020-07-29 98c8caa28a02354c86ea5788c6d7a09e38147f79
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package softbus
 
import (
    "fmt"
    "time"
 
    "github.com/golang/protobuf/proto"
)
 
const (
    // HeartbeatKey fixed key for hb to servermanager
    HeartbeatKey = 11
    // RegKey fixed key for hb to servermanager
    RegKey = 12
    // UpKey fixed key for update topic to servermanager
    UpKey = 13
    // GetTopicInfoTypeTopic topic
    GetTopicInfoTypeTopic = "gettopic"
    // GetTopicInfoTypeChannel channel
    GetTopicInfoTypeChannel = "getchannel"
)
 
type shmKeyAndProcInfo struct {
    sock *DgramSocket
    info *ProcInfo
}
 
// Handle handler
type Handle struct {
    m          map[string]*shmKeyAndProcInfo
    sockWorker *DgramSocket
}
 
// Register reg
func Register(info *RegisterInfo) *Handle {
    m := make(map[string]*shmKeyAndProcInfo)
 
    // 首先请求一堆key
    sockReg := OpenDgramSocket()
    if sockReg == nil {
        return nil
    }
    var msg, rdata []byte
    var err error
    for {
        if msg == nil {
            if msg, err = proto.Marshal(info); err != nil {
                time.Sleep(100 * time.Millisecond)
                continue
            }
        }
        if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
            break
        }
        time.Sleep(100 * time.Millisecond)
    }
    sockReg.Close()
    // 得到key,赋值
    if rdata != nil {
 
    }
    // 只收不发
    for _, v := range info.Channel {
        s := OpenDgramSocket()
        m[v] = &shmKeyAndProcInfo{
            sock: s,
            info: info.ProcInfo,
        }
    }
 
    // pub/sub使用同一个socket
    pbs := OpenDgramSocket()
    for _, v := range info.PubTopic {
        m[v] = &shmKeyAndProcInfo{
            sock: pbs,
            info: info.ProcInfo,
        }
    }
    for _, v := range info.SubTopic {
        m[v] = &shmKeyAndProcInfo{
            sock: pbs,
            info: info.ProcInfo,
        }
    }
 
    s := OpenDgramSocket()
    return &Handle{
        m:          m,
        sockWorker: s,
    }
}
 
// GetTopicInfo get topic info
func (h *Handle) GetTopicInfo(topic, typ string) int {
    if v, ok := h.m[topic]; ok {
        return v.sock.Port()
    }
    return -1
}
 
func (h *Handle) send2(data []byte, key int, logID string) error {
    if r := h.sockWorker.SendTo(data, key); r != 0 {
        return fmt.Errorf("%s SendTo Failed: %d", logID, r)
    }
    return nil
}
 
// HeartBeat hb
func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        return h.send2(msg, HeartbeatKey, "HeartBeat")
    }
    return err
}
 
// SendOnly no recv
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        return h.send2(msg, key, "SendOnly/Pub")
    }
    return err
}
 
// Pub func
func (h *Handle) Pub(info *MsgInfo) error {
    // return h.SendOnly(PubKey, info)
    return nil
}