package softbus import ( "context" "fmt" "sync" "time" "github.com/golang/protobuf/proto" ) const ( // RegKey fixed key for hb to servermanager RegKey = 12 // GetTopicInfoTypeTopic topic GetTopicInfoTypeTopic = "gettopic" // GetTopicInfoTypeChannel channel GetTopicInfoTypeChannel = "getchannel" ) type sockServer struct { sock *DgramSocket info *ProcInfo } type sockClient struct { sock *DgramSocket peer int } // TransInfo 传输的数据和必要的记录 type TransInfo struct { info *MsgInfo port int } // Handle handler /* sockHB/sockPub/sockWorker可以使用一个socket 但是由于需要支持多线程且心跳/发布都是很重要的信息,单独一个socket处理 worker处理短时的发送 */ type Handle struct { ctx context.Context // 创建channel对应的reply,等待读取其中的内容,server // 其中必须有一个作为Request函数的server m map[string]*sockServer // 创建reply服务Request函数 sockRep *sockServer // 创建心跳连接,client,仅发送心跳信息 // 心跳需要保证单独的socket发送,如果跟其他共用socket,如果阻塞就无法发送 sockHB *sockClient // 创建更新主题连接,client,仅发送主题更新信息 // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理 sockPub *sockClient // 创建订阅的socket // 订阅的主题发送的消息 sockSub *sockClient // 创建一个万能socket发送给任意server sockWorker *sockClient // 多线程 mtxWorker sync.Mutex chSub chan TransInfo chReply chan TransInfo } func garbageCollect(ctx context.Context, h *Handle) { <-ctx.Done() for _, v := range h.m { v.sock.Close() } h.sockHB.sock.Close() h.sockPub.sock.Close() h.sockSub.sock.Close() h.sockRep.sock.Close() h.sockWorker.sock.Close() } func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) { for { select { case <-ctx.Done(): return default: if data, peer, err := sock.RecvFromTimeout(0, 10*1000); err == nil { var info MsgInfo if err := proto.Unmarshal(data, &info); err == nil { ch <- TransInfo{ info: &info, port: peer, } } } else { // time.Sleep(10 * time.Millisecond) } } } } // Register reg func Register(ctx context.Context, info *RegisterInfo) *Handle { m := make(map[string]*sockServer) // 首先请求一堆key sockReg := OpenDgramSocket() if sockReg == nil { return nil } defer sockReg.Close() var msg, rdata []byte var err error loop: for { select { case <-ctx.Done(): return nil default: if msg == nil { if msg, err = proto.Marshal(info); err != nil { time.Sleep(100 * time.Millisecond) continue } } if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil { break loop } } } // 得到key,赋值 var regReply RegisterInfoReply if err := proto.Unmarshal(rdata, ®Reply); err != nil { return nil } // 收发req/rep channel, server for _, v := range info.Channel { if k, ok := regReply.ChannelKey[v]; ok { s := OpenDgramSocket() s.Bind(int(k)) m[v] = &sockServer{ sock: s, info: info.ProcInfo, } } } chSize := 5 chSub := make(chan TransInfo, chSize) chReply := make(chan TransInfo, chSize) // reply使用一个,服务Request, server sockReply := OpenDgramSocket() sockReply.Bind(int(regReply.ReplyKey)) // 启动接收线程 go recvRoutine(ctx, sockReply, chSub) repS := &sockServer{ sock: sockReply, info: info.ProcInfo, } // heartbeat使用一个socket, client sockHB := OpenDgramSocket() hbC := &sockClient{ sock: sockHB, peer: int(regReply.HeartbeatKey), } // 发布主题使用一个, client sockUp := OpenDgramSocket() pubC := &sockClient{ sock: sockUp, peer: int(regReply.UpdateTopicKey), } // sub使用一个socket, client sockSub := OpenDgramSocket() // sockSub.Bind(int(regReply.SubTopicKey)) // 订阅主题 for _, v := range info.SubTopic { sockSub.Sub(v, int(regReply.SubTopicKey)) } // 启动接收线程 go recvRoutine(ctx, sockSub, chSub) subC := &sockClient{ sock: sockSub, peer: -1, } // 万能socket,仅作为客户端使用, 或者获取topic key sockW := OpenDgramSocket() uniC := &sockClient{ sock: sockW, peer: int(regReply.GetTopicKey), } handle := &Handle{ ctx: ctx, m: m, sockHB: hbC, sockPub: pubC, sockSub: subC, sockRep: repS, sockWorker: uniC, chSub: chSub, chReply: chReply, } go garbageCollect(ctx, handle) return handle } const ( timeoutSec = 1 timeoutUsec = 0 ) // GetTopicInfo get topic info func (h *Handle) GetTopicInfo(topic, typ string) int { // 据说不更新,先用缓存,否则需要新创建一个socket,来从manager请求key // ***k if v, ok := h.m[topic]; ok { return v.sock.Port() } // 远程获取 msg := &TopicInfo{ Topic: topic, TopicType: typ, } if data, err := proto.Marshal(msg); err == nil { h.mtxWorker.Lock() if rdata, err := h.sockWorker.sock.SendAndRecvTimeout(data, h.sockWorker.peer, timeoutSec, timeoutUsec); err == nil { h.mtxWorker.Unlock() var rmsg TopicInfoReply if err := proto.Unmarshal(rdata, &rmsg); err == nil { return int(rmsg.Key) } } h.mtxWorker.Unlock() } return -1 } func (h *Handle) send2(sc *sockClient, data []byte, logID string) error { if r := sc.sock.SendToTimeout(data, sc.peer, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("%s SendTo Failed: %d", logID, r) } return nil } // HeartBeat hb func (h *Handle) HeartBeat(info *HeartbeatInfo) error { msg, err := proto.Marshal(info) if err == nil { return h.send2(h.sockHB, msg, "HeartBeat") } return err } // SendOnly no recv func (h *Handle) SendOnly(key int, info *MsgInfo) error { h.mtxWorker.Lock() defer h.mtxWorker.Unlock() msg, err := proto.Marshal(info) if err == nil { if r := h.sockWorker.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("SendOnly Failed: %d", r) } } return err } // Pub func func (h *Handle) Pub(info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { return h.send2(h.sockPub, msg, "Pub") } return err } // Request req sync func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo { h.mtxWorker.Lock() defer h.mtxWorker.Unlock() msg, err := proto.Marshal(info) if err != nil { return nil } // 同步接口,需要等待返回值 var ret MsgInfo loop: for { select { case <-h.ctx.Done(): return nil default: if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil { if err := proto.Unmarshal(data, &ret); err == nil { break loop } } } } return &ret } // Reply request func (h *Handle) Reply(key int, info *MsgInfo) error { msg, err := proto.Marshal(info) if err == nil { if r := h.sockRep.sock.SendToTimeout(msg, key, timeoutSec, timeoutUsec); r != 0 { return fmt.Errorf("Reply Failed: %d", r) } } return err } // GetMesg get mesg for sub or reply func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) { if len(h.chSub) > 1 { m := <-h.chSub subMsg = m.info } if len(h.chReply) > 1 { m := <-h.chReply replyMsg = m.info replyKey = m.port } return }