liuxiaolong
2021-04-25 c7069befa28a0f2594f0746044318a30d6989c19
使用bhsgo by lichao
1个文件已删除
4个文件已修改
910 ■■■■ 已修改文件
broker.go 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go 626 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.proto 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
message.go 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
micronode.go 175 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
broker.go
@@ -1,17 +1,17 @@
package bhomeclient
import "basic.com/valib/bhomebus.git"
import "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
type Broker interface {
    //发布到本机
    Publish(topic string, msg []byte) error
    Publish(string, []byte) error
    //发布到远程机器
    PublishNet(nodes []bhomebus.NetNode, topic string, msg []byte) error
    PublishNet([]bhome_msg.BHAddress, string, []byte) error
    //订阅一些主题,可动态新增
    Subscribe(topics []string)
    Subscribe([]string)
    //注销订阅的主题
    DeSub(topics []string)
    DeSub([]string)
}
hbusc.go
@@ -1,63 +1,45 @@
package bhomeclient
import (
    "basic.com/valib/bhomebus.git"
    "basic.com/valib/bhshmq.git/api/bhsgo"
    "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
    "unsafe"
)
type sockServer struct {
    sock *bhomebus.Socket
    info *ProcInfo
}
type sockClient struct {
    sock *bhomebus.Socket
    peer int
}
type TransInfo struct {
    info *MsgInfo
    port int
type MsgReq struct {
    ProcId         string
    bhome_msg.MsgRequestTopic
    Src         unsafe.Pointer
}
type BHBus struct {
    ctx context.Context
    ctx         context.Context
    conf *Config
    ri             *RegisterInfo
    nodes []NodeInfo     //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
    mtxNode sync.Mutex   //访问节点主题表时,需要加锁
    conf         *Config
    m map[string]*sockServer
    nodes         []NodeInfo     //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
    mtxNode     sync.Mutex   //访问节点主题表时,需要加锁
    wg *sync.WaitGroup
    wg             *sync.WaitGroup
    sockRep *sockServer  //响应其他进程request的socket,server
    sockHB *sockClient  //维持心跳的socket,线程实时发送,需要单独处理
    sockPub *sockClient  //发布主题的socket,需要单独socket处理
    sockSub *sockClient  //订阅主题的socket,线程实时接收消息,需要单独处理
    sockWorker     *sockClient  //发给任意的server,短暂的request client
    //mtxWorker     sync.Mutex     //SendAndRecv可能不是线程安全的
    chSub chan TransInfo
    chReply chan TransInfo
    ChSub   chan bhome_msg.MsgPublish
    ChReply chan MsgReq
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
func recvRoutine(ctx context.Context, s *bhomebus.Socket, wg *sync.WaitGroup, ch chan<-TransInfo, logFn func(...interface{})) {
    var data []byte
    var key int
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
    var procId string
    var msg bhome_msg.MsgRequestTopic
    var src unsafe.Pointer
    for {
        select {
        case <-ctx.Done():
@@ -65,20 +47,16 @@
            wg.Done()
            return
        default:
            n := s.RecvfromTimeout(&data, &key, 1000) //目前10001返回值表示超时
            if n == 0 {
                var info MsgInfo
                if err := json.Unmarshal(data, &info);err == nil {
                    ch <- TransInfo{
                        info: &info,
                        port: key,  //这个key在发布订阅模式中是bus的key,是个固定值,上层用不到
                    }
                    data = []byte{}
                    key = 0
                } else {
                    logFn("unmarshal to MsgInfo err:", err)
            if bhsgo.ReadRequest(&procId, &msg, &src, 100) {
                ch <- MsgReq{
                    procId,
                    msg,
                    src,
                }
                procId = ""
                msg.Reset()
                src = unsafe.Pointer(nil)
            } else {
                time.Sleep(100 * time.Millisecond)
            }
@@ -86,59 +64,22 @@
    }
}
//func recvandsendRoutine(h context.WrapperHandler, s *bhomebus.Socket, wg *sync.WaitGroup,serveFn func(rdata []byte, rkey int, sdata *[]byte) bool, logFn func(...interface{})) {
//    for {
//        select {
//        case <-ctx.Done():
//            logFn("recvandsendRoutine ctx.Done")
//            wg.Done()
//            return
//        default:
//            n := s.RecvandsendTimeout(1000, serveFn) //目前10001返回值表示超时
//            if n != 0 {
//                logFn("RecvandsendTimeout success")
//            } else {
//                //time.Sleep(100 * time.Millisecond)
//            }
//        }
//    }
//}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
    handle := &BHBus {
        ctx: ctx,
        conf: config,
        m: make(map[string]*sockServer),
        chSub: make(chan TransInfo, config.chSize),
        chReply: make(chan TransInfo, config.chSize),
        ctx:     ctx,
        conf:    config,
        ri:      ri,
        ChSub:   make(chan bhome_msg.MsgPublish, config.chSize),
        ChReply: make(chan MsgReq, config.chSize),
    }
    var err error
    err = bhomebus.Init("libshm_queue.so")
    if err != nil {
        handle.printLog("Init so err:", err)
        return nil, err
    }
    err = bhomebus.ShmInit(512)
    if err != nil {
        handle.printLog("shmInit size err:", err)
        return nil, err
    }
    regSock := bhomebus.OpenSocket()
    if regSock == nil {
        handle.printLog("Open Socket ret Nil")
        return nil, errors.New("OpenSocket ret Nil")
    }
    defer func() {
        regSock.Close()
        handle.printLog("regSock.CLose")
    }()
    var msg []byte
    var regAddr []bhomebus.NetNode
    var regR *RegisterReply  //注册结果信息
    //如果注册失败,就会一直尝试注册
    procI := bhome_msg.ProcInfo{
        ProcId: []byte(ri.Proc.ID),
        Name: []byte(ri.Proc.Name),
    }
    var regReply bhome_msg.MsgCommonReply
loop:
    for {
        select {
@@ -146,165 +87,91 @@
            handle.printLog("register <-q")
            return nil,errors.New("ctx is done")
        default:
            if msg == nil {
                rid, err := json.Marshal(*ri)
                if err != nil {
                    handle.printLog("marshal registerInfo err:", err)
                    return nil, errors.New("marshal registerInfo err:"+err.Error())
                }
                s := MsgInfo{
                    SrcProc: ri.Proc,
                    MsgType: MesgType_ReqRep,
                    Topic:   TOPIC_REGISTER,
                    Body:    rid,
                }
                handle.printLog("register MsgInfo:", s)
                dRegData,err := json.Marshal(s)
                if err != nil {
                    handle.printLog("marshal deregister msg err:", err)
                    return nil, err
                }
                handle.printLog(string(dRegData))
                msg = dRegData
            }
            if regAddr == nil {
                regAddr = append([]bhomebus.NetNode{}, bhomebus.NetNode{
                    Key: handle.conf.regKey,
                })
            }
            var rMsg []bhomebus.Mesg
            n := regSock.SendandrecvTimeout(regAddr, msg, &rMsg, handle.conf.sendTimeOut) //n代表成功发送的节点的个数
            handle.printLog("regSock.SendandrecvTimeout n:", n)
            if n == 1 && len(rMsg) == 1 {
                var cr Reply
                if err = json.Unmarshal(rMsg[0].Data, &cr);err != nil {
                    handle.printLog("unmarshal regReply err:", err)
                    return nil, errors.New("unmarshal regReply err:"+err.Error())
                } else {
                    if cr.Success {
                        if rpd,err := json.Marshal(cr.Data);err ==nil {
                            var rr RegisterReply
                            if err = json.Unmarshal(rpd, &rr); err == nil {
                                regR = &rr
                                break loop
                            } else {
                                handle.printLog("unmarshal RegisterReply err:", err)
                            }
                        } else {
                            handle.printLog("marshal cr.Data err:", err)
                        }
                    } else {
                        handle.printLog("cr:", cr)
                    }
                }
            if bhsgo.Register(&procI, &regReply, handle.conf.sendTimeOut) {
                break loop
            } else {
                time.Sleep(1 * time.Second)
                time.Sleep(time.Second)
            }
        }
    }
    handle.printLog("register Reply:", *regR)
    for _, v := range ri.Channel {
        if k,ok := regR.ChannelKey[v];ok {
            s := bhomebus.OpenSocket()
            s.ForceBind(int(k))
            handle.m[v] = &sockServer{
                sock: s,
                info: &ri.Proc,
    if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
        topics := bhome_msg.MsgTopicList{}
        var regTopicReply bhome_msg.MsgCommonReply
        for _,t := range ri.PubTopic {
            topics.TopicList = append(topics.TopicList, []byte(t))
        }
    loopRT:
        for {
            select {
            case <-q:
                handle.printLog("RegisterTopics recv quit signal")
                return nil, errors.New("RegisterTopics recv quit signal")
            default:
                if bhsgo.RegisterTopics(&topics, &regTopicReply, handle.conf.sendTimeOut) {
                    handle.printLog("bhsgo.RegisterTopics success!!")
                    break loopRT
                } else {
                    time.Sleep(time.Second)
                }
            }
        }
        handle.wg.Add(1)
        go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
    }
    //维持心跳的socket
    sockHB := bhomebus.OpenSocket()
    handle.printLog("open sockHB")
    handle.sockHB = &sockClient{
        sock: sockHB,
        peer: int(regR.HeartbeatKey),
    }
    handle.printLog("register done!" )
    handle.wg = &sync.WaitGroup{}
    if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
        sockReply := bhomebus.OpenSocket()
        sockReply.ForceBind(int(regR.ReplyKey))
        handle.printLog("after pubTopic forceBind")
        handle.wg.Add(1)
        //serve server reply
        go recvRoutine(ctx, sockReply, handle.wg, handle.chReply, handle.printLog)
        handle.sockRep = &sockServer{
            sock: sockReply,
            info: &ri.Proc,
        }
    }
    //发布消息的socket, pub是将消息发布到bus中,所以不需要指定key
    sockPub := bhomebus.OpenSocket()
    handle.sockPub = &sockClient{
        sock: sockPub,
        peer: -1,
    }
    //有订阅消息才需要启动协程接收消息
    if ri.SubTopic != nil && len(ri.SubTopic) > 0 {
        //订阅消息的socket
        sockSub := bhomebus.OpenSocket()
        //订阅所有主题
        handle.printLog("start Sub topics")
        handle.printLog("sub topics")
        var subList bhome_msg.MsgTopicList
        for _,v := range ri.SubTopic {
            subN := sockSub.Sub(v)
            handle.printLog("subTopic:", v, " ret n:", subN)
            subList.TopicList = append(subList.TopicList, []byte(v))
        }
        //启动订阅信息接收
        handle.wg.Add(1)
        go recvRoutine(ctx, sockSub, handle.wg, handle.chSub, handle.printLog)
        handle.sockSub = &sockClient{
            sock: sockSub,
            peer: -1,
        var subReply bhome_msg.MsgCommonReply
        if bhsgo.Subscribe(&subList, &subReply, handle.conf.sendTimeOut) {
            //启动订阅信息接收
            handle.wg.Add(1)
            go recvSubRoutine(ctx, handle.wg, handle.ChSub, handle.printLog)
        }
    }
    sockWorker := bhomebus.OpenSocket()
    handle.sockWorker = &sockClient{
        sock: sockWorker,
        peer: int(regR.QueryTopicKey),
    }
    return handle, nil
}
func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
    var procId string
    var msg bhome_msg.MsgPublish
    for {
        select {
        case <-ctx.Done():
            logFn("recvRoutine ctx.Done")
            wg.Done()
            return
        default:
            if bhsgo.ReadSub(&procId, &msg, 100) {
                ch <- msg
                procId = ""
                msg.Reset()
            } else {
                //time.Sleep(100 * time.Millisecond)
            }
        }
    }
}
//DeRegister
func (h *BHBus) DeRegister(dri *RegisterInfo) error {
    data, err := json.Marshal(*dri)
    if err != nil {
        return err
    }
    dRegData,err := json.Marshal(MsgInfo{
        MsgType: "",
        Topic: TOPIC_DEREGISTER,
        Body: data,
    })
    if err != nil {
        return err
    }
    //h.mtxWorker.Lock()
    //defer h.mtxWorker.Unlock()
    netNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
        Key: h.conf.regKey,
    })
    var retMsg []bhomebus.Mesg
    n := h.sockWorker.sock.SendandrecvTimeout(netNodes, dRegData, &retMsg, h.conf.sendTimeOut)
    if n == 0 {
        return nil
    }
    h.printLog("DeRegister retMsg:", retMsg)
    return fmt.Errorf("DeRegister n:%d", n)
    h.printLog("DeRegister")
    return nil
}
func (h *BHBus) printLog(v ...interface{}) {
@@ -318,75 +185,27 @@
    h.printLog("call BHBus free")
    h.wg.Wait()
    h.printLog("h.wg.Wait done")
    for _,v := range h.m {
        v.sock.Close()
    }
    if h.sockRep != nil {
        h.sockRep.sock.Close()
        h.sockRep = nil
    }
    if h.sockHB != nil {
        h.sockHB.sock.Close()
        h.sockHB = nil
    }
    if h.sockPub != nil {
        h.sockPub.sock.Close()
        h.sockPub = nil
    }
    if h.sockSub != nil {
        h.sockSub.sock.Close()
        h.sockSub = nil
    }
    if h.sockWorker != nil {
        h.sockWorker.sock.Close()
        h.sockWorker = nil
    }
    h.printLog("BHBus Freed")
}
//HeartBeat send
func (h *BHBus) HeartBeat(info *HeartBeatInfo) error {
    data, err := json.Marshal(*info)
    if err == nil {
        hbd,err := json.Marshal(MsgInfo{
            SrcProc: info.Proc,
            MsgType: MesgType_ReqRep,
            Topic:   TOPIC_HEARTBEAT,
            Body:    data,
        })
        if err != nil {
            h.printLog("marshal heartbeat msgInfo err:", err)
            return err
        }
        var rMsg []bhomebus.Mesg
        hbAddr := append([]bhomebus.NetNode{}, bhomebus.NetNode{
            Key: h.sockHB.peer,
        })
        //h.printLog("start send heartbeat")
        n := h.sockHB.sock.SendandrecvTimeout(hbAddr, hbd, &rMsg, h.conf.sendTimeOut) //n代表成功发送的节点的个数
        //h.printLog("sockHB.Sendandrecv n:", n, "len(rMsg):", len(rMsg))
        if n > 0 {
            return nil
        } else {
            return fmt.Errorf("sockHB Sendandrecv ret n:%d", n)
        }
func (h *BHBus) HeartBeat() error {
    procI := bhome_msg.ProcInfo{
        ProcId: []byte(h.ri.Proc.ID),
        Name: []byte(h.ri.Proc.Name),
    }
    return err
    var ret bhome_msg.MsgCommonReply
    if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
        return nil
    } else {
        return errors.New("send heartBeat return false")
    }
}
//func (h *BHBus) send2(s *sockClient, data []byte, timeout int) error {
//    n := s.sock.SendtoTimeout(data, s.peer, timeout)
//    if n == 0 {
//        return nil
//    }
//    return errors.New("SendtoTimeout n:"+strconv.Itoa(n))
//}
//更新主题列表
func (h *BHBus)  UpdateNodeTopics(arr []NodeInfo) {
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
    h.mtxNode.Lock()
    defer h.mtxNode.Unlock()
    h.nodes = arr
@@ -395,185 +214,92 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhomebus.NetNode,error) {
    h.mtxNode.Lock()
    defer h.mtxNode.Unlock()
    var nodes []bhomebus.NetNode
    reqNetNode := append([]bhomebus.NetNode{}, bhomebus.NetNode{
        Key: h.sockWorker.peer,
    })
    reqD,err := json.Marshal(MsgInfo{
        SrcProc: *srcProc,
        MsgType: MesgType_ReqRep,
        Topic:   TOPIC_QUERYKEY,
        Body:    []byte(topic),
    })
    if err != nil {
        return nil, fmt.Errorf("marshal req err:%s", err.Error())
    }
    var ret []bhomebus.Mesg
    n := h.sockWorker.sock.SendandrecvTimeout(reqNetNode, reqD, &ret, h.conf.sendTimeOut)
    if n > 0 {
        var reply Reply
        err = json.Unmarshal(ret[0].Data, &reply)
        if err != nil {
            h.printLog("unmarshal err:", err)
            return nil, err
        }
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]bhome_msg.BHAddress,error) {
        if reply.Success {
            rd,err := json.Marshal(reply.Data)
            if err == nil {
                err = json.Unmarshal(rd, &nodes)
                if err == nil {
                    return nodes, nil
                } else {
                    h.printLog("unmarshal err:", err, "nodes:", nodes, " center reply.Data:", reply.Data)
                    return nil, fmt.Errorf("unmarshal to nodes err:%s", err.Error())
                }
            } else {
                return nil, fmt.Errorf("marshal reply.Data err:%s", err.Error())
            }
        } else {
            h.printLog("reply success:", reply.Success, "msg:", reply.Msg, "data:", reply.Data)
            return nil, fmt.Errorf("REPLY msg:%s", reply.Msg)
        }
    } else {
        return nil,    fmt.Errorf("GetNetNodeByTopic ret n:%d", n)
    }
    return nil, nil
}
func (h *BHBus) Request(serverId string, req *MsgInfo, milliSecs int) (*Reply, error) {
func (h *BHBus) Request(serverId string, req *bhome_msg.MsgRequestTopic, milliSecs int) (*Reply, error) {
    //1.首先需要通过topic拿到本机对应的NetNode
    rNodes, err := h.GetNetNodeByTopic(serverId, &req.SrcProc, req.Topic)
    if err != nil {
        h.printLog("topic: ", req.Topic, " rNodes: ", rNodes, " err:", err)
        return nil, err
    }
    if rNodes == nil || len(rNodes) == 0 {
        return nil, errors.New("rNodes empty, topic: "+ req.Topic)
    }
    //2.将请求返送到对应的server,并等待返回值
    data, err := json.Marshal(*req)
    if err != nil {
        h.printLog("marshal(*req) err:", err)
        return nil, err
    }
    var ret []bhomebus.Mesg
    n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, milliSecs)
    if n > 0 && len(ret) > 0 {
        var resp Reply
        if err = json.Unmarshal(ret[0].Data, &resp); err == nil {
            return &resp, nil
        } else {
            h.printLog("unmarshal ret[0].Data err:", err)
            return nil, err
    pid := ""
    mrt := bhome_msg.MsgRequestTopicReply{}
    dest := bhome_msg.BHAddress{}
    if bhsgo.Request(&dest, req, &pid, &mrt, milliSecs) {
        var reply Reply
        if err := json.Unmarshal(mrt.Data, &reply); err != nil {
            return nil,err
        }
        return &reply, nil
    } else {
        h.printLog("Request n: ", n, " len(ret): ", len(ret))
        return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
        return nil, errors.New("request ")
    }
}
func (h *BHBus) RequestOnly(rData []byte, rNodes []bhomebus.NetNode) ([]byte, error) {
    var ret []bhomebus.Mesg
    n := h.sockWorker.sock.SendandrecvTimeout(rNodes, rData, &ret, h.conf.sendTimeOut)
    if n > 0 && len(ret) > 0 {
        return ret[0].Data, nil
func (h *BHBus) RequestOnly(req *bhome_msg.MsgRequestTopic, destArr []bhome_msg.BHAddress) ([]byte, error) {
    dest := bhome_msg.BHAddress{}
    if destArr != nil && len(destArr) > 0 {
        dest = destArr[0]
    }
    pid := ""
    r := bhome_msg.MsgRequestTopicReply{}
    if bhsgo.Request(&dest, req, &pid, &r, h.conf.sendTimeOut) {
        return r.Data, nil
    } else {
        h.printLog("Request n: ", n, " len(ret): ", len(ret), "rData:", string(rData))
        return nil, fmt.Errorf("request err, SendandrecvTimeout n:%d", n)
        h.printLog("bhsgo.Request request err:", r.Errmsg)
        return nil, errors.New("bhsgo.Request return false")
    }
}
func (h *BHBus) Reply(replyKey int, i *Reply) error {
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
    data,err := json.Marshal(*i)
    if err != nil {
        return err
    }
    n := h.sockRep.sock.SendtoTimeout(data, replyKey, h.conf.sendTimeOut)
    h.printLog("reply to key:", replyKey, " n:",n)
    if n != 0 {
        return errors.New("reply sendToTimeOut n:"+strconv.Itoa(n))
    rep := bhome_msg.MsgRequestTopicReply{
        Data: data,
    }
    return nil
    if bhsgo.SendReply(src, &rep) {
        return nil
    }
    return errors.New("reply return false")
}
func (h *BHBus) RequestCenter(req *bhome_msg.MsgRequestTopic) (*Reply, error) {
//只发送请求,不需要应答.
//暴露在上层的,只有topic,没有key。
func (h *BHBus) SendOnly(key int, arg *MsgInfo) error {
    data,err := json.Marshal(*arg)
    if err != nil {
        return err
    }
    //h.mtxWorker.Lock()
    //defer h.mtxWorker.Unlock()
    n := h.sockWorker.sock.SendtoTimeout(data, key, h.conf.sendTimeOut)
    if n != 0 {
        return fmt.Errorf("sendOnly ret n:%d", n)
    }
    return nil
}
func (h *BHBus) RequestCenter(req *MsgInfo) (*Reply, error) {
    data, err := json.Marshal(*req)
    if err != nil {
        return nil, err
    }
    rNodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
        Key: KEY_QUERY,
    })
    //h.mtxWorker.Lock()
    //defer h.mtxWorker.Unlock()
    var ret []bhomebus.Mesg
    n := h.sockWorker.sock.SendandrecvTimeout(rNodes, data,&ret, h.conf.sendTimeOut)
    h.printLog("requestCenter n:", n, "len(ret):", len(ret))
    if n > 0 && len(ret) > 0{
        var cr Reply
        if err = json.Unmarshal(ret[0].Data, &cr); err == nil {
            return &cr, nil
        } else {
            h.printLog("unmarshal to CommonReply err:", err)
        }
    }
    return nil, fmt.Errorf("request center err")
    return nil, errors.New("")
}
//向主题通道中发布消息
func (h *BHBus) Pub(nodes []bhomebus.NetNode, msg *MsgInfo) error {
    data,err := json.Marshal(*msg)
    if err == nil {
        if n := h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, h.conf.pubTimeOut);n > 0 {
            return nil
        } else {
            return fmt.Errorf("pub err n:%d", n)
        }
func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
    if bhsgo.Publish(msg, h.conf.pubTimeOut) {
        return nil
    } else {
        return fmt.Errorf("pub err ")
    }
    return err
}
func (h *BHBus) PubTimeout(nodes []bhomebus.NetNode, msg *MsgInfo, timeout int) int {
    data,err := json.Marshal(*msg)
    if err == nil {
        return h.sockPub.sock.PubTimeout(nodes, msg.Topic, data, timeout)
func (h *BHBus) PubTimeout(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish, timeout int) int {
    if bhsgo.Publish(msg, timeout) {
        return 1
    }
    return -1
}
//追加订阅的主题消息
func (h *BHBus) Sub(topics []string) {
    if topics != nil {
        for _,t := range topics {
            h.sockSub.sock.Sub(t)
    if topics != nil && len(topics) >0 {
        var subList bhome_msg.MsgTopicList
        for _, v := range topics {
            subList.TopicList = append(subList.TopicList, []byte(v))
        }
        var subReply bhome_msg.MsgCommonReply
        if bhsgo.Subscribe(&subList, &subReply, h.conf.sendTimeOut) {
            h.printLog("sub topics")
        }
    }
}
@@ -581,40 +307,6 @@
//注销订阅的主题
func (h *BHBus) DeSub(topics []string) {
    if topics != nil {
        for _,t := range topics {
            if n := h.sockSub.sock.Desub(t); n != 0 {
                h.printLog("DeSub topic:", t, " n:", n)
            }
        }
    }
}
//获取sub 或者需要reply的消息
//func (h *BHBus) GetMsg() (subMsg *MsgInfo) {
//    if h.sockHB == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
//        return nil
//    }
//    if len(h.chSub) >0 {
//        m  := <-h.chSub
//        subMsg = m.info
//    }
//    return
//}
func (h *BHBus) GetMsg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
    if h.sockHB == nil && h.sockRep == nil && h.sockPub == nil && h.sockSub == nil && h.sockWorker == nil {
        return nil, nil, -1
    }
    if len(h.chSub) >0 {
        m  := <-h.chSub
        subMsg = m.info
    }
    if len(h.chReply) >0 {
        m := <-h.chReply
        replyMsg = m.info
        replyKey = m.port
    }
    return
}
hbusc.proto
File was deleted
message.go
@@ -35,8 +35,8 @@
)
type NodeList struct {
    Ip string `json:"ip"`
    Port int `json:"port"`
    Ip             string         `json:"ip"`
    Port         int         `json:"port"`
}
const (
@@ -60,43 +60,10 @@
}
type RegisterInfo struct {
    Proc       ProcInfo `json:"proc"`       // 进程的信息
    Channel    []string `json:"channel"`    // 新增频道,对应一个新的共享内存队列
    PubTopic   []string `json:"pubTopic"`   // 进程对外发布的服务主题
    SubTopic   []string `json:"subTopic"`   // 进程订阅的服务主题
}
type RegisterReply struct {
    TCPProxyIP         string                 `json:"tcpProxyIP"`     // BHomeCenter启动的tcp代理服务器IP
    TCPProxyPort       int                    `json:"tcpProxyPort"`   // BHomeCenter启动的tcp代理服务器端口
    HeartbeatKey       int                    `json:"heartbeatKey"`   // client发送心跳的key
    ReplyKey           int                    `json:"replyKey"`       // client的应答服务key
    ChannelKey         map[string]int         `json:"channelKey"`     // client的chan对应的key
    QueryTopicKey      int                    `json:"queryTopicKey"`  // client查询topic对应的key时用到的key
    Status             int                    `json:"status"`         // 请求状态,目前只有两个,成功返回200,失败202
}
type HeartBeatInfo struct {
    Proc              ProcInfo             `json:"proc"`          // 进程的信息
    HealthLevel       string               `json:"healthLevel"`   // 健康等级
    Fps               int                  `json:"fps"`           // 处理帧率(dec解码帧率、sdk处理帧率)
    WarnInfo          string               `json:"warnInfo"`      // 报警信息
    ErrorInfo         string               `json:"errorInfo"`     // 错误信息
    OtherInfo         []byte               `json:"otherInfo"`     // 其他特有信息,如有需要就用这个
    OtherInfoSize     int                  `json:"otherInfoSize"` // 其他特有信息长度
}
type HeartBeatReply struct {
    Status             int                    `json:"status"`         // 请求状态,目前只有两个,成功返回200,失败202
    Desc               string                 `json:"desc"`           // 请求状态的描述,成功"success",失败返回失败原因,如心跳服务未启动
}
type MsgInfo struct {
    SrcProc          ProcInfo               `json:"srcProc"`        // 源进程基本信息
    MsgType           string                 `json:"msgType"`        // 数据类型,可为请求、发布、订阅、应答等
    Topic             string                `json:"topic"`            // 请求的函数,并不对应任何的shmKey,业务层的topic
    Body             []byte                `json:"body"`            // 请求内容
    Proc               ProcInfo `json:"proc"`       // 进程的信息
    Channel            []string `json:"channel"`    // 新增频道,对应一个新的共享内存队列
    PubTopic           []string `json:"pubTopic"`   // 进程对外发布的服务主题
    SubTopic           []string `json:"subTopic"`   // 进程订阅的服务主题
}
micronode.go
@@ -1,7 +1,7 @@
package bhomeclient
import (
    "basic.com/valib/bhomebus.git"
    "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
    "context"
    "encoding/json"
    "errors"
@@ -20,7 +20,7 @@
    serverId     string
    fnLog         func(...interface{})
    SubCh         chan *MsgInfo
    SubCh         chan *bhome_msg.MsgPublish
    mtx         sync.Mutex
    started     bool
@@ -39,7 +39,7 @@
        reg:      reg,
        procInfo: &reg.Proc,
        fnLog:    fnLog,
        SubCh:    make(chan *MsgInfo, 512),
        SubCh:    make(chan *bhome_msg.MsgPublish, 512),
    }
    return mn, nil
@@ -65,14 +65,6 @@
}
func (ms *MicroNode) startHeartbeat() {
    hbi := &HeartBeatInfo{
        HealthLevel: "health",
        Fps:         12,
        WarnInfo:    "warn",
        ErrorInfo:   "error",
        Proc:    *ms.procInfo,
    }
    t := time.NewTicker(1 * time.Second)
    defer t.Stop()
@@ -81,7 +73,9 @@
        case <-ms.ctx.Done():
            return
        case <-t.C:
            ms.handle.HeartBeat(hbi)
            ms.handle.HeartBeat()
        default:
            time.Sleep(500 * time.Millisecond)
        }
    }
}
@@ -110,52 +104,18 @@
            select {
            case <- ms.ctx.Done():
                return
            case msgR := <-ms.handle.ChReply: //收到其它进程的请求消息
                go ms.serve(ms.handle.ctx, &msgR)
            case msgS := <-ms.handle.ChSub:
                ms.printLog("Recv Sub Message:", string(msgS.Data))
                ms.SubCh <- &msgS
            default:
                msgS, msgR, keyR := ms.handle.GetMsg()
                if msgS != nil {
                    //收到其它进程的发布消息
                    ms.printLog("Recv Sub Message:", string(msgS.Body))
                    ms.SubCh <- msgS
                }
                if msgR != nil {
                    //收到其它进程的请求消息
                    go ms.serve(msgR, keyR)
                }
                time.Sleep(50 * time.Millisecond)
            }
        }
        //接收订阅到的消息
        //go ms.startRecvSubMsg()
        //作为server启动
        //ms.serve()
    }
    ms.mtx.Unlock()
}
//开始接收订阅消息
//func (ms *MicroNode) startRecvSubMsg() {
//    for {
//        select {
//        case <- ms.ctx.Done():
//            return
//        default:
//            msgS, msgR, keyR := ms.handle.GetMsg()
//            if msgS != nil {
//                //收到其它进程的发布消息
//                ms.printLog("Recv Sub Message:", string(msgS.Body))
//                ms.SubCh <- msgS
//            }
//            if msgR != nil {
//                //收到其它进程的请求消息
//                go ms.serve(msgR, keyR)
//            }
//
//            time.Sleep(50 * time.Millisecond)
//        }
//    }
//}
func (ms *MicroNode) Request(serverId string, request Request, milliSecs int) (*Reply,error) {
    t := time.Now()
@@ -163,9 +123,9 @@
    ms.printLog("1:", time.Since(t))
    t = time.Now()
    rb, _ := json.Marshal(request)
    msgR := &MsgInfo {
        Topic: request.Path,
        Body: rb,
    msgR := &bhome_msg.MsgRequestTopic{
        Topic: []byte(request.Path),
        Data: rb,
    }
    ms.printLog("2:", time.Since(t))
    return ms.handle.Request(serverId, msgR, milliSecs)
@@ -173,20 +133,20 @@
func (ms *MicroNode) RequestTopic(serverId string, request Request, milliSecs int) (*Reply,error) {
    rb, _ := json.Marshal(request)
    msgR := &MsgInfo{
        Topic: request.Path,
        Body: rb,
    msgR := &bhome_msg.MsgRequestTopic{
        Topic: []byte(request.Path),
        Data: rb,
    }
    return ms.handle.Request(serverId, msgR, milliSecs)
}
func (ms *MicroNode) RequestOnly(rData []byte, nodes []bhomebus.NetNode) ([]byte, error) {
    return ms.handle.RequestOnly(rData, nodes)
func (ms *MicroNode) RequestOnly(req *bhome_msg.MsgRequestTopic, dest []bhome_msg.BHAddress) ([]byte, error) {
    return ms.handle.RequestOnly(req, dest)
}
//获取本机中某一个主题的 key  (结果只有一个元素)
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhomebus.NetNode {
func (ms *MicroNode) GetLocalNetNodeByTopic(topicName string) []bhome_msg.BHAddress {
    netNodes, err := ms.handle.GetNetNodeByTopic(ms.serverId, ms.procInfo, topicName)
    if err != nil {
        ms.printLog("topic:",topicName, " netNodes:", netNodes, "err:", err)
@@ -196,7 +156,7 @@
}
//获取集群中所有节点某个主题的key信息,   (结果可能有多个)
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhomebus.NetNode {
func (ms *MicroNode) GetAllNetNodesByTopic(topicName string) []bhome_msg.BHAddress {
    netNodes, err := ms.handle.GetNetNodeByTopic("", ms.procInfo, topicName)
    if err != nil {
        return nil
@@ -205,11 +165,7 @@
}
func (ms *MicroNode) GetRegisteredClient() ([]RegisteredClient,error) {
    r := MsgInfo{
        SrcProc: *ms.procInfo,
        MsgType: MesgType_ReqRep,
        Topic: TOPIC_QUERYPROC,
    }
    r := bhome_msg.MsgRequestTopic{}
    cr, err := ms.handle.RequestCenter(&r)
    if err != nil {
        ms.printLog("requestCenter reply:", cr, "err:", err)
@@ -234,76 +190,37 @@
    return nil, fmt.Errorf("GetRegisteredClient list failed")
}
//func (ms *MicroNode) call(rdata []byte, rkey int, sdata *[]byte) bool {
//    ri := &Reply{}
//    if ms.handlers == nil {
//        ri.Msg = "send wrong addr, check yourself!!!"
//    } else {
//        var msgR MsgInfo
//        err := json.Unmarshal(rdata, &msgR)
//        if err != nil {
//            ri.Msg = err.Error()
//        } else {
//            var reqBody Request
//            err = json.Unmarshal(rdata, &msgR.Body)
//            if err != nil {
//                ri.Msg = err.Error()
//            } else {
//                ms.printLog("srcProc:", reqBody.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", rkey)
//                if f,ok := ms.handlers[reqBody.Path];ok {
//                    reqBody.SrcProc = msgR.SrcProc
//                    ri = f(&reqBody)
//                    ms.printLog("call funcMap f,reply:", *ri)
//                } else {
//                    ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
//                    ri.Msg = "请求的接口不存在,请检查url"
//                }
//            }
//        }
//    }
//    result, err := json.Marshal(*ri)
//    if err != nil {
//        sdata = nil
//    } else {
//        sdata = &result
//    }
//    return ri.Success
//}
//func (ms *MicroNode) serve() {
//    if ms.handlers == nil {
//        return
//    }
//    for i:=0;i<10;i++ {
//        ms.handle.wg.Add(1)
//        go recvandsendRoutine(ms.ctx, ms.handle.sockRep.sock, ms.handle.wg, ms.call, ms.fnLog)
//    }
//}
func (ms *MicroNode) serve(msgR *MsgInfo, p int) {
func (ms *MicroNode) serve(ctx context.Context, msgR *MsgReq) {
    if ms.handlers == nil {
        return
    }
    var reqBody Request
    var ri *Reply
    err := json.Unmarshal(msgR.Body, &reqBody)
    err := json.Unmarshal(msgR.Data, &reqBody)
    if err != nil {
        ms.printLog("serve unmarshal msgR.Body err:", err)
        ri = &Reply {
            Msg: err.Error(),
        }
    } else {
        ms.printLog("srcProc:", msgR.SrcProc,"reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap, "to key: ", p)
        ms.printLog("reqBody Path:", reqBody.Path, " contentType:", reqBody.ContentType, " formMap:",reqBody.FormMap, " postFormMap:", reqBody.PostFormMap)
        if f,ok := ms.handlers[reqBody.Path];ok {
            reqBody.SrcProc = msgR.SrcProc
            reqBody.SrcProc = ProcInfo{
                ID: msgR.ProcId,
            }
            h := WrapperHandler{
                ms,
                ms,
            }
            ri = f(&h, &reqBody)
            ms.printLog("call funcMap f,reply.Success:", ri.Success)
            select {
            case <-ctx.Done():
                ms.printLog("get ctx.Done before f(&h, &reqBody) return, exit!!!")
            default:
                ri = f(&h, &reqBody)
                ms.printLog("call funcMap f,reply.Success:", ri.Success)
            }
        } else {
            ms.printLog("ms.funcMap not eixst path: ", reqBody.Path)
            ri = &Reply{
@@ -314,7 +231,7 @@
        }
    }
    retErr := ms.handle.Reply(p, ri)
    retErr := ms.handle.Reply(msgR.Src, ri)
    if retErr != nil {
        ms.printLog("retErr:", retErr)
    }
@@ -322,24 +239,22 @@
//发布到本机
func (ms *MicroNode) Publish(topic string,msg []byte) error {
    nodes := append([]bhomebus.NetNode{}, bhomebus.NetNode{
        Key: 8,
    })
    var nodes []bhome_msg.BHAddress
    return ms.PublishNet(nodes, topic, msg)
}
func (ms *MicroNode) PublishNet(nodes []bhomebus.NetNode, topic string,msg []byte) error {
    pi := &MsgInfo{
        Topic: topic,
        Body: msg,
func (ms *MicroNode) PublishNet(nodes []bhome_msg.BHAddress, topic string,data []byte) error {
    pi := &bhome_msg.MsgPublish{
        Topic: []byte(topic),
        Data: data,
    }
    return ms.handle.Pub(nodes, pi)
}
func (ms *MicroNode) PublishNetTimeout(nodes []bhomebus.NetNode, topic string, msg []byte, timeout int) int {
    pi := &MsgInfo{
        Topic: topic,
        Body: msg,
func (ms *MicroNode) PublishNetTimeout(nodes []bhome_msg.BHAddress, topic string, data []byte, timeout int) int {
    pi := &bhome_msg.MsgPublish{
        Topic: []byte(topic),
        Data: data,
    }
    return ms.handle.PubTimeout(nodes, pi, timeout)
}