package softbus
|
|
import (
|
"context"
|
"fmt"
|
"sync"
|
"time"
|
|
"github.com/golang/protobuf/proto"
|
)
|
|
const (
|
// RegKey fixed key for hb to servermanager
|
RegKey = 12
|
// GetTopicInfoTypeTopic topic
|
GetTopicInfoTypeTopic = "gettopic"
|
// GetTopicInfoTypeChannel channel
|
GetTopicInfoTypeChannel = "getchannel"
|
)
|
|
type sockServer struct {
|
sock *DgramSocket
|
info *ProcInfo
|
}
|
|
type sockClient struct {
|
sock *DgramSocket
|
peer int
|
}
|
|
// TransInfo 传输的数据和必要的记录
|
type TransInfo struct {
|
info *MsgInfo
|
port int
|
}
|
|
// Handle handler
|
/*
|
sockHB/sockPub/sockWorker可以使用一个socket
|
但是由于需要支持多线程且心跳/发布都是很重要的信息,单独一个socket处理
|
worker处理短时的发送
|
*/
|
type Handle struct {
|
ctx context.Context
|
wg *sync.WaitGroup
|
// 创建channel对应的reply,等待读取其中的内容,server
|
m map[string]*sockServer
|
// 创建reply服务Request函数
|
sockRep *sockServer
|
// 创建心跳连接,client,仅发送心跳信息
|
// 心跳需要保证单独的socket发送,如果跟其他共用socket,如果阻塞就无法发送
|
sockHB *sockClient
|
// 创建更新主题连接,client,仅发送主题更新信息
|
// 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理
|
sockPub *sockClient
|
// 创建订阅的socket
|
// 订阅的主题peer发送的消息
|
sockSub *sockClient
|
// 创建一个万能socket发送给任意server
|
sockWorker *sockClient
|
// 多线程
|
mtxWorker sync.Mutex
|
|
// GetMessge实现需要的缓存
|
chSub chan TransInfo
|
chReply chan TransInfo
|
}
|
|
const (
|
routineSub = "sub"
|
routineReply = "reply"
|
)
|
|
// 订阅消息或reply接收的Request请求消息,通过缓存用于GetMessage获取
|
func recvRoutine(ctx context.Context, sock *DgramSocket, wg *sync.WaitGroup, ch chan<- TransInfo, id string) {
|
for {
|
select {
|
case <-ctx.Done():
|
wg.Done()
|
return
|
default:
|
if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil {
|
var info MsgInfo
|
if err := proto.Unmarshal(data, &info); err == nil {
|
ch <- TransInfo{
|
info: &info,
|
port: peer,
|
}
|
|
// if id == routineReply {
|
// fmt.Println("repley server recv:", info)
|
// }
|
}
|
} else {
|
// time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}
|
}
|
|
// Register firstly register to manager
|
func Register(ctx context.Context, info *RegisterInfo) *Handle {
|
m := make(map[string]*sockServer)
|
|
// 首先请求一堆key, 包括reply/sub/pub/topic/heartbeat
|
sockReg := OpenDgramSocket()
|
if sockReg == nil {
|
return nil
|
}
|
defer sockReg.Close()
|
|
var msg, rdata []byte
|
var err error
|
loop:
|
for {
|
select {
|
case <-ctx.Done():
|
return nil
|
default:
|
|
if msg == nil {
|
if msg, err = proto.Marshal(info); err != nil {
|
time.Sleep(100 * time.Millisecond)
|
continue
|
}
|
}
|
if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
|
break loop
|
} else {
|
time.Sleep(100 * time.Millisecond)
|
}
|
}
|
}
|
|
// 得到key
|
var regReply RegisterInfoReply
|
if err := proto.Unmarshal(rdata, ®Reply); err != nil {
|
return nil
|
}
|
|
// 收发req/rep channel, server
|
// channels对应的server,都是reply
|
for _, v := range info.Channel {
|
if k, ok := regReply.ChannelKey[v]; ok {
|
s := OpenDgramSocket()
|
s.Bind(int(k))
|
m[v] = &sockServer{
|
sock: s,
|
info: info.ProcInfo,
|
}
|
}
|
}
|
|
wg := &sync.WaitGroup{}
|
|
// 创建sub/reply服务缓存
|
chSize := 5
|
chSub := make(chan TransInfo, chSize)
|
chReply := make(chan TransInfo, chSize)
|
|
// reply server, 服务Request
|
sockReply := OpenDgramSocket()
|
sockReply.Bind(int(regReply.ReplyKey))
|
// 启动接收线程
|
wg.Add(1)
|
go recvRoutine(ctx, sockReply, wg, chReply, routineReply)
|
repS := &sockServer{
|
sock: sockReply,
|
info: info.ProcInfo,
|
}
|
|
// heartbeat client, 单独使用一个socket发送心跳
|
sockHB := OpenDgramSocket()
|
hbC := &sockClient{
|
sock: sockHB,
|
peer: int(regReply.HeartbeatKey),
|
}
|
|
// pub client, 单独使用一个发布主题
|
sockPub := OpenDgramSocket()
|
pubC := &sockClient{
|
sock: sockPub,
|
peer: int(regReply.UpdateTopicKey),
|
}
|
|
// sub client, 共用一个socket
|
sockSub := OpenDgramSocket()
|
// sockSub.Bind(int(regReply.SubTopicKey))
|
// 订阅所有主题
|
for _, v := range info.SubTopic {
|
sockSub.Sub(v, int(regReply.SubTopicKey))
|
}
|
// 启动接收线程
|
wg.Add(1)
|
go recvRoutine(ctx, sockSub, wg, chSub, routineSub)
|
subC := &sockClient{
|
sock: sockSub,
|
peer: -1,
|
}
|
|
// 万能socket,仅作为客户端使用, 或者获取topic key, 保存topic服务器的key
|
sockW := OpenDgramSocket()
|
uniC := &sockClient{
|
sock: sockW,
|
peer: int(regReply.GetTopicKey),
|
}
|
handle := &Handle{
|
ctx: ctx,
|
wg: wg,
|
m: m,
|
sockHB: hbC,
|
sockPub: pubC,
|
sockSub: subC,
|
sockRep: repS,
|
sockWorker: uniC,
|
chSub: chSub,
|
chReply: chReply,
|
}
|
|
return handle
|
}
|
|
// Free free
|
func (h *Handle) Free() {
|
h.wg.Wait()
|
|
for _, v := range h.m {
|
v.sock.Close()
|
}
|
h.sockHB.sock.Close()
|
h.sockHB = nil
|
h.sockPub.sock.Close()
|
h.sockPub = nil
|
h.sockSub.sock.Close()
|
h.sockSub = nil
|
h.sockRep.sock.Close()
|
h.sockRep = nil
|
h.sockWorker.sock.Close()
|
h.sockWorker = nil
|
|
fmt.Println("Handle Safe Free")
|
}
|
|
const (
|
timeoutSec = 1
|
timeoutUsec = 0
|
)
|
|
// GetTopicInfo get topic info
|
func (h *Handle) GetTopicInfo(topic, typ string) int {
|
// 据说不更新,先用缓存,否则从manager请求key
|
// ***k
|
if v, ok := h.m[topic]; ok {
|
return v.sock.Port()
|
}
|
// 远程获取
|
msg := &TopicInfo{
|
Topic: topic,
|
TopicType: typ,
|
}
|
if data, err := proto.Marshal(msg); err == nil {
|
h.mtxWorker.Lock()
|
if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil {
|
h.mtxWorker.Unlock()
|
var rmsg TopicInfoReply
|
if err := proto.Unmarshal(rdata, &rmsg); err == nil {
|
return int(rmsg.Key)
|
}
|
}
|
h.mtxWorker.Unlock()
|
}
|
return -1
|
}
|
|
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
|
if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 {
|
return fmt.Errorf("%s SendTo Failed: %d", logID, r)
|
}
|
return nil
|
}
|
|
// HeartBeat hb
|
func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
return h.send2(h.sockHB, msg, "HeartBeat")
|
}
|
return err
|
}
|
|
// SendOnly no recv
|
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
|
return fmt.Errorf("SendOnly Failed: %d", r)
|
}
|
}
|
return err
|
}
|
|
// Pub func
|
func (h *Handle) Pub(info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
return h.send2(h.sockPub, msg, "Pub")
|
}
|
return err
|
}
|
|
// Request req sync
|
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
|
msg, err := proto.Marshal(info)
|
if err != nil {
|
return nil
|
}
|
|
// 同步接口,需要等待返回值
|
var ret MsgInfo
|
loop:
|
for {
|
select {
|
case <-h.ctx.Done():
|
return nil
|
default:
|
if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
|
if err := proto.Unmarshal(data, &ret); err == nil {
|
break loop
|
}
|
}
|
}
|
}
|
return &ret
|
}
|
|
// RequestWithTimeout req sync
|
func (h *Handle) RequestWithTimeout(key int, info *MsgInfo, timeout int) *MsgInfo {
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
|
msg, err := proto.Marshal(info)
|
if err != nil {
|
return nil
|
}
|
|
until := (float32)(timeout)
|
one := (float32)(timeoutSec) + ((float32)(timeoutUsec) / 1000000)
|
fc := until / one
|
|
count := (int)(fc)
|
|
try := 0
|
|
// 同步接口,需要等待返回值
|
var ret MsgInfo
|
loop:
|
for {
|
select {
|
case <-h.ctx.Done():
|
return nil
|
default:
|
if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil {
|
if err := proto.Unmarshal(data, &ret); err == nil {
|
break loop
|
} else {
|
try++
|
if try > count {
|
return nil
|
}
|
}
|
} else {
|
try++
|
if try > count {
|
return nil
|
}
|
}
|
}
|
}
|
return &ret
|
}
|
|
// Reply request
|
func (h *Handle) Reply(key int, info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 {
|
return fmt.Errorf("Reply Failed: %d", r)
|
}
|
}
|
return err
|
}
|
|
// GetMesg get mesg for sub or reply
|
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
|
if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
|
return nil, nil, -1
|
}
|
|
if len(h.chSub) > 1 {
|
m := <-h.chSub
|
subMsg = m.info
|
}
|
|
if len(h.chReply) > 1 {
|
m := <-h.chReply
|
replyMsg = m.info
|
replyKey = m.port
|
}
|
return
|
}
|