package softbus
|
|
import (
|
"context"
|
"fmt"
|
"sync"
|
"time"
|
|
"github.com/golang/protobuf/proto"
|
)
|
|
const (
|
// RegKey fixed key for hb to servermanager
|
RegKey = 12
|
// GetTopicInfoTypeTopic topic
|
GetTopicInfoTypeTopic = "gettopic"
|
// GetTopicInfoTypeChannel channel
|
GetTopicInfoTypeChannel = "getchannel"
|
)
|
|
type sockServer struct {
|
sock *DgramSocket
|
info *ProcInfo
|
}
|
|
type sockClient struct {
|
sock *DgramSocket
|
peer int
|
}
|
|
// TransInfo 传输的数据和必要的记录
|
type TransInfo struct {
|
info *MsgInfo
|
port int
|
}
|
|
// Handle handler
|
/*
|
sockHB/sockPub/sockWorker可以使用一个socket
|
但是由于需要支持多线程且心跳/发布都是很重要的信息,单独一个socket处理
|
worker处理短时的发送
|
*/
|
type Handle struct {
|
ctx context.Context
|
// 创建channel对应的reply,等待读取其中的内容,server
|
// 其中必须有一个作为Request函数的server
|
m map[string]*sockServer
|
// 创建reply服务Request函数
|
sockRep *sockServer
|
// 创建心跳连接,client,仅发送心跳信息
|
// 心跳需要保证单独的socket发送,如果跟其他共用socket,如果阻塞就无法发送
|
sockHB *sockClient
|
// 创建更新主题连接,client,仅发送主题更新信息
|
// 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理
|
sockPub *sockClient
|
// 创建订阅的socket
|
// 订阅的主题发送的消息
|
sockSub *sockClient
|
// 创建一个万能socket发送给任意server
|
sockWorker *sockClient
|
// 多线程
|
mtxWorker sync.Mutex
|
|
chSub chan TransInfo
|
chReply chan TransInfo
|
}
|
|
func garbageCollect(ctx context.Context, h *Handle) {
|
|
<-ctx.Done()
|
|
for _, v := range h.m {
|
v.sock.Close()
|
}
|
h.sockHB.sock.Close()
|
h.sockPub.sock.Close()
|
h.sockSub.sock.Close()
|
h.sockRep.sock.Close()
|
h.sockWorker.sock.Close()
|
}
|
|
func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
if data, peer, err := sock.RecvFrom(); err == nil {
|
var info *MsgInfo
|
if err := proto.Unmarshal(data, info); err == nil {
|
ch <- TransInfo{
|
info: info,
|
port: peer,
|
}
|
}
|
}
|
}
|
}
|
}
|
|
// Register reg
|
func Register(ctx context.Context, info *RegisterInfo) *Handle {
|
m := make(map[string]*sockServer)
|
|
// 首先请求一堆key
|
sockReg := OpenDgramSocket()
|
if sockReg == nil {
|
return nil
|
}
|
defer sockReg.Close()
|
|
var msg, rdata []byte
|
var err error
|
loop:
|
for {
|
select {
|
case <-ctx.Done():
|
return nil
|
default:
|
|
if msg == nil {
|
if msg, err = proto.Marshal(info); err != nil {
|
time.Sleep(100 * time.Millisecond)
|
continue
|
}
|
}
|
if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
|
break loop
|
}
|
}
|
}
|
|
// 得到key,赋值
|
var regReply RegisterInfoReply
|
if err := proto.Unmarshal(rdata, ®Reply); err != nil {
|
return nil
|
}
|
|
// 收发req/rep channel, server
|
for _, v := range info.Channel {
|
if k, ok := regReply.ChannelKey[v]; ok {
|
s := OpenDgramSocket()
|
s.Bind(int(k))
|
m[v] = &sockServer{
|
sock: s,
|
info: info.ProcInfo,
|
}
|
}
|
}
|
|
chSize := 5
|
chSub := make(chan TransInfo, chSize)
|
chReply := make(chan TransInfo, chSize)
|
|
// reply使用一个,服务Request, server
|
sockReply := OpenDgramSocket()
|
sockReply.Bind(int(regReply.ReplyKey))
|
// 启动接收线程
|
go recvRoutine(ctx, sockReply, chSub)
|
repS := &sockServer{
|
sock: sockReply,
|
info: info.ProcInfo,
|
}
|
|
// heartbeat使用一个socket, client
|
sockHB := OpenDgramSocket()
|
hbC := &sockClient{
|
sock: sockHB,
|
peer: int(regReply.HeartbeatKey),
|
}
|
// 发布主题使用一个, client
|
sockUp := OpenDgramSocket()
|
pubC := &sockClient{
|
sock: sockUp,
|
peer: int(regReply.UpdateTopicKey),
|
}
|
// sub使用一个socket, client
|
sockSub := OpenDgramSocket()
|
// sockSub.Bind(int(regReply.SubTopicKey))
|
// 订阅主题
|
for _, v := range info.SubTopic {
|
sockSub.Sub(v, int(regReply.SubTopicKey))
|
}
|
// 启动接收线程
|
go recvRoutine(ctx, sockSub, chSub)
|
subC := &sockClient{
|
sock: sockSub,
|
peer: -1,
|
}
|
|
// 万能socket,仅作为客户端使用
|
sockW := OpenDgramSocket()
|
uniC := &sockClient{
|
sock: sockW,
|
peer: -1,
|
}
|
handle := &Handle{
|
ctx: ctx,
|
m: m,
|
sockHB: hbC,
|
sockPub: pubC,
|
sockSub: subC,
|
sockRep: repS,
|
sockWorker: uniC,
|
chSub: chSub,
|
chReply: chReply,
|
}
|
|
go garbageCollect(ctx, handle)
|
|
return handle
|
}
|
|
// GetTopicInfo get topic info
|
func (h *Handle) GetTopicInfo(topic, typ string) int {
|
// 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key
|
// ***k
|
if v, ok := h.m[topic]; ok {
|
return v.sock.Port()
|
}
|
// 远程获取
|
msg := &TopicInfo{
|
Topic: topic,
|
TopicType: typ,
|
}
|
if data, err := proto.Marshal(msg); err == nil {
|
h.mtxWorker.Lock()
|
if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
|
h.mtxWorker.Unlock()
|
var rmsg *TopicInfoReply
|
if err := proto.Unmarshal(rdata, rmsg); err == nil {
|
return int(rmsg.Key)
|
}
|
}
|
h.mtxWorker.Unlock()
|
}
|
return -1
|
}
|
|
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
|
if r := sc.sock.SendTo(data, sc.peer); r != 0 {
|
return fmt.Errorf("%s SendTo Failed: %d", logID, r)
|
}
|
return nil
|
}
|
|
// HeartBeat hb
|
func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
return h.send2(h.sockHB, msg, "HeartBeat")
|
}
|
return err
|
}
|
|
// SendOnly no recv
|
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
|
return fmt.Errorf("SendOnly Failed: %d", r)
|
}
|
}
|
return err
|
}
|
|
// Pub func
|
func (h *Handle) Pub(info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
return h.send2(h.sockPub, msg, "Pub")
|
}
|
return err
|
}
|
|
// Request req sync
|
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
|
msg, err := proto.Marshal(info)
|
if err != nil {
|
return nil
|
}
|
|
// 同步接口,需要等待返回值
|
var ret *MsgInfo
|
loop:
|
for {
|
select {
|
case <-h.ctx.Done():
|
return nil
|
default:
|
if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
|
if err := proto.Unmarshal(data, ret); err == nil {
|
break loop
|
}
|
}
|
}
|
}
|
return ret
|
}
|
|
// Reply request
|
func (h *Handle) Reply(key int, info *MsgInfo) error {
|
msg, err := proto.Marshal(info)
|
if err == nil {
|
if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
|
return fmt.Errorf("Reply Failed: %d", r)
|
}
|
}
|
return err
|
}
|
|
// GetMesg get mesg for sub or reply
|
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
|
if len(h.chSub) > 1 {
|
m := <-h.chSub
|
subMsg = m.info
|
}
|
|
if len(h.chReply) > 1 {
|
m := <-h.chReply
|
replyMsg = m.info
|
replyKey = m.port
|
}
|
return
|
}
|