package softbus
|
|
import (
|
"context"
|
"fmt"
|
"time"
|
|
"github.com/golang/protobuf/proto"
|
)
|
|
const (
|
// RegKey fixed key for hb to servermanager
|
RegKey = 12
|
// GetTopicInfoTypeTopic topic
|
GetTopicInfoTypeTopic = "gettopic"
|
// GetTopicInfoTypeChannel channel
|
GetTopicInfoTypeChannel = "getchannel"
|
)
|
|
type subOReply struct {
|
sock *DgramSocket
|
info *ProcInfo
|
}
|
|
type sockRe struct {
|
sock *DgramSocket
|
peer int
|
}
|
|
// TransInfo 传输的数据和必要的记录
|
type TransInfo struct {
|
info *MsgInfo
|
port int
|
}
|
|
// Handle handler
|
type Handle struct {
|
ctx context.Context
|
// 创建channel对应的reply,等待读取其中的内容,server
|
// 其中必须有一个作为Request函数的server
|
m map[string]*subOReply
|
// 创建心跳连接,client,仅发送心跳信息
|
sockHB *sockRe
|
// 创建更新主题连接,client,仅发送主题更新信息
|
sockUp *sockRe
|
// 创建订阅的socket
|
sockSub *subOReply
|
// 创建reply服务Request函数
|
sockRep *sockRe
|
// 创建一个万能socket发送给任意server
|
sockWorker *sockRe
|
|
chSub chan TransInfo
|
chReply chan TransInfo
|
}
|
|
func garbageCollect(ctx context.Context, h *Handle) {
|
|
<-ctx.Done()
|
|
for _, v := range h.m {
|
v.sock.Close()
|
}
|
h.sockHB.sock.Close()
|
h.sockUp.sock.Close()
|
h.sockSub.sock.Close()
|
h.sockRep.sock.Close()
|
h.sockWorker.sock.Close()
|
}
|
|
func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
if data, peer, err := sock.RecvFrom(); err == nil {
|
var info *MsgInfo
|
if err := proto.Unmarshal(data, info); err == nil {
|
ch <- TransInfo{
|
info: info,
|
port: peer,
|
}
|
}
|
}
|
}
|
}
|
}
|
|
// Register reg
|
func Register(ctx context.Context, info *RegisterInfo) *Handle {
|
m := make(map[string]*subOReply)
|
|
// 首先请求一堆key
|
sockReg := OpenDgramSocket()
|
if sockReg == nil {
|
return nil
|
}
|
defer sockReg.Close()
|
|
var msg, rdata []byte
|
var err error
|
loop:
|
for {
|
select {
|
case <-ctx.Done():
|
return nil
|
default:
|
|
if msg == nil {
|
if msg, err = proto.Marshal(info); err != nil {
|
time.Sleep(100 * time.Millisecond)
|
continue
|
}
|
}
|
if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
|
break loop
|
}
|
}
|
}
|
|
// 得到key,赋值
|
var regReply RegisterInfoReply
|
if err := proto.Unmarshal(rdata, ®Reply); err != nil {
|
return nil
|
}
|
|
// 收发req/rep channel
|
for _, v := range info.Channel {
|
if k, ok := regReply.ChannelKey[v]; ok {
|
s := OpenDgramSocket()
|
s.Bind(int(k))
|
m[v] = &subOReply{
|
sock: s,
|
info: info.ProcInfo,
|
}
|
}
|
}
|
|
chSize := 5
|
chSub := make(chan TransInfo, chSize)
|
chReply := make(chan TransInfo, chSize)
|
|
// heartbeat使用一个socket
|
sockHB := OpenDgramSocket()
|
hbr := &sockRe{
|
sock: sockHB,
|
peer: int(regReply.HeartbeatKey),
|
}
|
// 更新主题使用一个
|
sockUp := OpenDgramSocket()
|
upr := &sockRe{
|
sock: sockUp,
|
peer: int(regReply.UpdateTopicKey),
|
}
|
|
// sub使用一个socket
|
sockSub := OpenDgramSocket()
|
sockSub.Bind(int(regReply.SubTopicKey))
|
// 启动接收线程
|
go recvRoutine(ctx, sockSub, chSub)
|
sub := &subOReply{
|
sock: sockSub,
|
info: info.ProcInfo,
|
}
|
// reply使用一个,服务Request
|
sockReply := OpenDgramSocket()
|
sockReply.Bind(int(regReply.ReplyKey))
|
// 启动接收线程
|
go recvRoutine(ctx, sockReply, chSub)
|
rer := &sockRe{
|
sock: sockReply,
|
peer: -1,
|
}
|
|
// 万能socket,仅作为客户端使用
|
sockW := OpenDgramSocket()
|
swr := &sockRe{
|
sock: sockW,
|
peer: -1,
|
}
|
handle := &Handle{
|
ctx: ctx,
|
m: m,
|
sockHB: hbr,
|
sockUp: upr,
|
sockSub: sub,
|
sockRep: rer,
|
sockWorker: swr,
|
chSub: chSub,
|
chReply: chReply,
|
}
|
|
go garbageCollect(ctx, handle)
|
|
return handle
|
}
|
|
// GetTopicInfo get topic info
|
func (h *Handle) GetTopicInfo(topic, typ string) int {
|
if v, ok := h.m[topic]; ok {
|
return v.sock.Port()
|
}
|
return -1
|
}
|
|
func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
|
if r := sr.sock.SendTo(data, sr.peer); r != 0 {
|
return fmt.Errorf("%s SendTo Failed: %d", logID, r)
|
}
|
return nil
|
}
|
|
// HeartBeat hb
|
func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
return h.send2(h.sockHB, msg, "HeartBeat")
|
}
|
return err
|
}
|
|
// SendOnly no recv
|
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
|
return fmt.Errorf("SendOnly Failed: %d", r)
|
}
|
}
|
return err
|
}
|
|
// Pub func
|
func (h *Handle) Pub(info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
return h.send2(h.sockUp, msg, "Pub")
|
}
|
return err
|
}
|
|
// Request req sync
|
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
|
msg, err := proto.Marshal(info)
|
if err != nil {
|
return nil
|
}
|
|
// 同步接口,需要等待返回值
|
var ret *MsgInfo
|
loop:
|
for {
|
select {
|
case <-h.ctx.Done():
|
return nil
|
default:
|
if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
|
if err := proto.Unmarshal(data, ret); err == nil {
|
break loop
|
}
|
}
|
}
|
}
|
return ret
|
}
|
|
// Reply request
|
func (h *Handle) Reply(key int, info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
|
return fmt.Errorf("Reply Failed: %d", r)
|
}
|
}
|
return err
|
}
|
|
// GetMesg get mesg for sub or reply
|
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
|
if len(h.chSub) > 1 {
|
m := <-h.chSub
|
subMsg = m.info
|
}
|
|
if len(h.chReply) > 1 {
|
m := <-h.chReply
|
replyMsg = m.info
|
replyKey = m.port
|
}
|
return
|
}
|