package bhomeclient
|
|
import (
|
"basic.com/valib/bhomebus.git"
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"os"
|
"strconv"
|
"sync"
|
"time"
|
)
|
|
type sockServer struct {
|
sock *bhomebus.Socket
|
info *ProcInfo
|
}
|
|
type sockClient struct {
|
sock *bhomebus.Socket
|
peer int
|
}
|
|
type TransInfo struct {
|
info *MsgInfo
|
port int
|
}
|
|
type BHBus struct {
|
ctx context.Context
|
|
conf *Config
|
|
nodes []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
|
mtxNode sync.Mutex //访问节点主题表时,需要加锁
|
|
m map[string]*sockServer
|
|
wg *sync.WaitGroup
|
|
sockRep *sockServer //响应其他进程request的socket,server
|
|
sockHB *sockClient //维持心跳的socket,线程实时发送,需要单独处理
|
|
sockPub *sockClient //发布主题的socket,需要单独socket处理
|
|
sockSub *sockClient //订阅主题的socket,线程实时接收消息,需要单独处理
|
|
sockWorker *sockClient //发给任意的server,短暂的request client
|
mtxWorker sync.Mutex //SendAndRecv可能不是线程安全的
|
|
chSub chan TransInfo
|
chReply chan TransInfo
|
}
|
|
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
|
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
|
var data []byte
|
var key int
|
for {
|
select {
|
case <-ctx.Done():
|
logFn("recvRoutine ctx.Done")
|
wg.Done()
|
return
|
default:
|
if n := s.RecvfromTimeout(&data, &key, 10);n == 0 {
|
var info MsgInfo
|
if err := json.Unmarshal(data, &info);err == nil {
|
ch <- TransInfo{
|
info: &info,
|
port: key, //这个key在发布订阅模式中是bus的key,是个固定值,上层用不到
|
}
|
|
data = []byte{}
|
key = 0
|
}
|
} else {
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}
|
}
|
|
//Register
|
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
|
handle := &BHBus{
|
conf: config,
|
m: make(map[string]*sockServer),
|
chSub: make(chan TransInfo, config.chSize),
|
chReply: make(chan TransInfo, config.chSize),
|
}
|
|
var err error
|
err = bhomebus.Init("libshm_queue.so")
|
if err != nil {
|
handle.printLog("Init so err:", err)
|
return nil, err
|
}
|
err = bhomebus.ShmInit(512)
|
if err != nil {
|
handle.printLog("shmInit size err:", err)
|
return nil, err
|
}
|
regSock := bhomebus.OpenSocket()
|
if regSock == nil {
|
handle.printLog("Open Socket ret Nil")
|
return nil, errors.New("OpenSocket ret Nil")
|
}
|
defer func() {
|
regSock.Close()
|
handle.printLog("regSock.CLose")
|
}()
|
|
var msg []byte
|
var regAddr []bhomebus.NetNode
|
var regR *RegisterReply //注册结果信息
|
//如果注册失败,就会一直尝试注册
|
loop:
|
for {
|
select {
|
case <-q:
|
handle.printLog("register <-q")
|
return nil,errors.New("ctx is done")
|
default:
|
if msg == nil {
|
rid, err := json.Marshal(*ri)
|
if err != nil {
|
handle.printLog("marshal registerInfo err:", err)
|
return nil, errors.New("marshal registerInfo err:"+err.Error())
|
}
|
s := MsgInfo{
|
SrcProc: ri.Proc,
|
MsgType: MesgType_ReqRep,
|
Topic: TOPIC_REGISTER,
|
Body: rid,
|
}
|
handle.printLog("register MsgInfo:", s)
|
dRegData,err := json.Marshal(s)
|
if err != nil {
|
handle.printLog("marshal deregister msg err:", err)
|
return nil, err
|
}
|
msg = dRegData
|
}
|
if regAddr == nil {
|
regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{
|
Key: handle.conf.regKey,
|
})
|
}
|
|
var rMsg []bhomebus.Mesg
|
n := regSock.Sendandrecv(regAddr, msg, &rMsg) //n代表成功发送的节点的个数
|
handle.printLog("regSock.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
|
if n == 1 && len(rMsg) == 1 {
|
var cr CommonReply
|
if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
|
handle.printLog("unmarshal regReply err:", err)
|
return nil, errors.New("unmarshal regReply err:"+err.Error())
|
} else {
|
if cr.Status == REPLY_SUCCESS {
|
var rr RegisterReply
|
if err = json.Unmarshal(cr.Body, &rr);err ==nil {
|
regR = &rr
|
break loop
|
} else {
|
handle.printLog("unmarshal RegisterReply err:", err)
|
}
|
|
} else {
|
handle.printLog("cr.Status:", cr.Status, "Desc:", cr.Desc)
|
}
|
|
}
|
} else {
|
time.Sleep(100 * time.Millisecond)
|
}
|
}
|
}
|
|
handle.printLog("register Reply:", *regR)
|
|
for _, v := range ri.Channel {
|
if k,ok := regR.ChannelKey[v];ok {
|
s := bhomebus.OpenSocket()
|
s.ForceBind(int(k))
|
handle.m[v] = &sockServer{
|
sock: s,
|
info: &ri.Proc,
|
}
|
}
|
}
|
|
//维持心跳的socket
|
sockHB := bhomebus.OpenSocket()
|
handle.sockHB = &sockClient{
|
sock: sockHB,
|
peer: int(regR.HeartbeatKey),
|
}
|
|
handle.wg = &sync.WaitGroup{}
|
|
if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
|
sockReply := bhomebus.OpenSocket()
|
sockReply.ForceBind(int(regR.ReplyKey))
|
handle.wg.Add(1)
|
//serve server reply
|
go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
|
handle.sockRep = &sockServer{
|
sock: sockReply,
|
info: &ri.Proc,
|
}
|
}
|
|
|
//发布消息的socket, pub是将消息发布到bus中,所以不需要指定key
|
sockPub := bhomebus.OpenSocket()
|
handle.sockPub = &sockClient{
|
sock: sockPub,
|
peer: -1,
|
}
|
|
//有订阅消息才需要启动协程接收消息
|
if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
|
//订阅消息的socket
|
sockSub := bhomebus.OpenSocket()
|
//订阅所有主题
|
for _,v := range ri.SubTopic {
|
sockSub.Sub(v)
|
}
|
|
//启动订阅信息接收
|
handle.wg.Add(1)
|
go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
|
handle.sockSub = &sockClient{
|
sock: sockSub,
|
peer: -1,
|
}
|
}
|
|
sockWorker := bhomebus.OpenSocket()
|
handle.sockWorker = &sockClient{
|
sock: sockWorker,
|
peer: int(regR.QueryTopicKey),
|
}
|
|
return handle, nil
|
}
|
|
//DeRegister
|
func (h *BHBus) DeRegister(dri *RegisterInfo) error {
|
data, err := json.Marshal(*dri)
|
if err != nil {
|
return err
|
}
|
|
dRegData,err := json.Marshal(MsgInfo{
|
MsgType: "",
|
Topic: TOPIC_DEREGISTER,
|
Body: data,
|
})
|
if err != nil {
|
return err
|
}
|
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
|
Key: h.conf.regKey,
|
})
|
var retMsg []bhomebus.Mesg
|
n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut)
|
if n == 0 {
|
return nil
|
}
|
h.printLog("DeRegister retMsg:", retMsg)
|
return fmt.Errorf("DeRegister n:%d", n)
|
}
|
|
func (h *BHBus) printLog(v ...interface{}) {
|
if h.conf.fnLog != nil {
|
h.conf.fnLog(v...)
|
}
|
}
|
|
//Release
|
func (h *BHBus) Free() {
|
h.printLog("call BHBus free")
|
h.wg.Wait()
|
h.printLog("h.wg.Wait done")
|
for _,v := range h.m {
|
v.sock.Close()
|
}
|
if h.sockRep != nil {
|
h.sockRep.sock.Close()
|
h.sockRep = nil
|
}
|
if h.sockHB != nil {
|
h.sockHB.sock.Close()
|
h.sockHB = nil
|
}
|
if h.sockPub != nil {
|
h.sockPub.sock.Close()
|
h.sockPub = nil
|
}
|
if h.sockSub != nil {
|
h.sockSub.sock.Close()
|
h.sockSub = nil
|
}
|
if h.sockWorker != nil {
|
h.sockWorker.sock.Close()
|
h.sockWorker = nil
|
}
|
|
h.printLog("BHBus Freed")
|
}
|
|
|
//HeartBeat send
|
func (h *BHBus) HeartBeat(info *HeartBeatInfo) error {
|
data, err := json.Marshal(*info)
|
if err == nil {
|
hbd,err := json.Marshal(MsgInfo{
|
SrcProc: info.Proc,
|
MsgType: MesgType_ReqRep,
|
Topic: TOPIC_HEARTBEAT,
|
Body: data,
|
})
|
if err != nil {
|
h.printLog("marshal heartbeat msgInfo err:", err)
|
return err
|
}
|
var rMsg []bhomebus.Mesg
|
hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{
|
Key: h.sockHB.peer,
|
})
|
//h.printLog("start send heartbeat")
|
n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n代表成功发送的节点的个数
|
//h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
|
|
if n > 0 {
|
return nil
|
} else {
|
return fmt.Errorf("sockHB Sendandrecv ret n:%d", n)
|
}
|
}
|
return err
|
}
|
|
//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error {
|
// n := s.sock.SendtoTimeout(data, s.peer, timeout)
|
// if n == 0 {
|
// return nil
|
// }
|
// return errors.New("SendtoTimeout n:"+strconv.Itoa(n))
|
//}
|
|
//更新主题列表
|
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
|
h.mtxNode.Lock()
|
defer h.mtxNode.Unlock()
|
h.nodes = arr
|
}
|
|
//获取topic对应的key
|
//如果传了serverId不为空,则获取指定机器上的topic-key
|
//如果server为空,则获取所有节点上topic-key
|
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
|
h.mtxNode.Lock()
|
defer h.mtxNode.Unlock()
|
var nodes []bhomebus.NetNode
|
reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
|
Key: h.sockWorker.peer,
|
})
|
reqD,err := json.Marshal(MsgInfo{
|
SrcProc: *srcProc,
|
MsgType: MesgType_ReqRep,
|
Topic: TOPIC_QUERYKEY,
|
Body: []byte(topic),
|
})
|
if err != nil {
|
return nil, fmt.Errorf("marshal req err:%s", err.Error())
|
}
|
var ret []bhomebus.Mesg
|
n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
|
if n > 0 {
|
var reply CommonReply
|
err = json.Unmarshal(ret[0].Data, &reply)
|
if err != nil {
|
h.printLog("unmarshal err:", err)
|
return nil, err
|
}
|
|
if reply.Status == REPLY_SUCCESS {
|
err = json.Unmarshal(reply.Body, &nodes)
|
if err == nil {
|
return nodes, nil
|
} else {
|
h.printLog("unmarshal err:", err, "nodes:", nodes)
|
return nil, fmt.Errorf("unmarshal reply.Body err:%s", err.Error())
|
}
|
} else {
|
h.printLog("reply status:", reply.Status, "desc:", reply.Desc, "body:", string(reply.Body))
|
return nil, fmt.Errorf("REPLY STATUS:%d", reply.Status)
|
}
|
} else {
|
return nil, fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
|
}
|
}
|
|
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (resp *MsgInfo, err error) {
|
//1.首先需要通过topic拿到本机对应的NetNode
|
rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
|
h.printLog("rNodes:", rNodes, "err:", err)
|
if err != nil {
|
return nil, err
|
}
|
//2.将请求返送到对应的server,并等待返回值
|
data, err := json.Marshal(*req)
|
h.printLog("marshal(*req) err:", err)
|
if err != nil {
|
return nil, err
|
}
|
var ret []bhomebus.Mesg
|
|
if n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs);n > 0 {
|
if len(ret) > 0 {
|
if err = json.Unmarshal(ret[0].Data, resp); err == nil {
|
return resp, nil
|
}
|
}
|
}
|
return nil, fmt.Errorf("request err")
|
}
|
|
func (h *BHBus) Reply(replyKey int, i MsgInfo) error {
|
data,err := json.Marshal(i)
|
if err != nil {
|
return err
|
}
|
|
n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
|
if n != 0 {
|
return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
|
}
|
return nil
|
}
|
|
|
//只发送请求,不需要应答.
|
//暴露在上层的,只有topic,没有key。
|
func (h *BHBus) SendOnly(key int, arg *MsgInfo) error {
|
data,err := json.Marshal(*arg)
|
if err != nil {
|
return err
|
}
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
|
if n != 0 {
|
return fmt.Errorf("sendOnly ret n:%d", n)
|
}
|
return nil
|
}
|
|
func (h *BHBus) RequestCenter(req *MsgInfo) (*CommonReply, error) {
|
data, err := json.Marshal(*req)
|
if err != nil {
|
return nil, err
|
}
|
rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
|
Key: KEY_QUERY,
|
})
|
h.mtxWorker.Lock()
|
defer h.mtxWorker.Unlock()
|
var ret []bhomebus.Mesg
|
n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
|
h.printLog("requestCenter n:", n, "len(ret):", len(ret))
|
if n > 0 && len(ret) > 0{
|
var cr CommonReply
|
if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
|
return &cr, nil
|
} else {
|
h.printLog("unmarshal to CommonReply err:", err)
|
}
|
}
|
return nil, fmt.Errorf("request center err")
|
}
|
|
|
//向主题通道中发布消息
|
func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
|
data,err := json.Marshal(*msg)
|
if err == nil {
|
if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n == 0 {
|
return nil
|
} else {
|
return fmt.Errorf("pub err n:%d", n)
|
}
|
}
|
|
return err
|
}
|
|
//追加订阅的主题消息
|
func (h *BHBus) Sub(topics []string) {
|
if topics != nil {
|
for _,t := range topics {
|
h.sockSub.sock.Sub(t)
|
}
|
}
|
}
|
|
//注销订阅的主题
|
func (h *BHBus) DeSub(topics []string) {
|
if topics != nil {
|
for _,t := range topics {
|
if n := h.sockSub.sock.Desub(t); n != 0 {
|
h.printLog("DeSub topic:", t, " n:", n)
|
}
|
}
|
}
|
}
|
|
|
//获取sub 或者需要reply的消息
|
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
|
if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
|
return nil,nil, -1
|
}
|
if len(h.chSub) >1 {
|
m := <-h.chSub
|
subMsg = m.info
|
}
|
if len(h.chReply) > 1 {
|
m := <-h.chReply
|
replyMsg = m.info
|
replyKey = m.port
|
}
|
return
|
}
|