zhangmeng
2023-12-05 2d5c411a22a653eb7cbde621db4e89b07755a852
remove sonic json dependency
1个文件已修改
57 ■■■■ 已修改文件
hbusc.go 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
hbusc.go
@@ -14,29 +14,29 @@
)
type MsgReq struct {
    ProcId         string
    ProcId string
    bhome_msg.MsgRequestTopic
    Src         unsafe.Pointer
    Src unsafe.Pointer
}
type BHBus struct {
    ctx         context.Context
    ctx context.Context
    ri             *RegisterInfo
    ri *RegisterInfo
    conf         *Config
    conf *Config
    nodes         []NodeInfo     //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
    mtxNode     sync.Mutex   //访问节点主题表时,需要加锁
    nodes   []NodeInfo //集群中节点状态信息,以及每个节点上的topic信息。集群状态下需要借助serf进行同步
    mtxNode sync.Mutex //访问节点主题表时,需要加锁
    wg             *sync.WaitGroup
    wg *sync.WaitGroup
    ChSub   chan bhome_msg.MsgPublish
    ChReply chan MsgReq
}
//获取其他进程发给此socket的消息,可以是sub的接收,也可以是reply的接收。
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<-MsgReq, logFn func(...interface{})) {
func recvRequestRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- MsgReq, logFn func(...interface{})) {
    var procId string
    var msg bhome_msg.MsgRequestTopic
    var src unsafe.Pointer
@@ -66,12 +66,12 @@
}
//Register
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus,error) {
    handle := &BHBus {
func Register(ctx context.Context, q chan os.Signal, config *Config, ri *RegisterInfo) (*BHBus, error) {
    handle := &BHBus{
        ctx:     ctx,
        conf:    config,
        ri:      ri,
        wg: &sync.WaitGroup{},
        wg:      &sync.WaitGroup{},
        ChSub:   make(chan bhome_msg.MsgPublish, config.chSize),
        ChReply: make(chan MsgReq, config.chSize),
    }
@@ -79,7 +79,7 @@
    //如果注册失败,就会一直尝试注册
    procI := bhome_msg.ProcInfo{
        ProcId: []byte(ri.Proc.ID),
        Name: []byte(ri.Proc.Name),
        Name:   []byte(ri.Proc.Name),
    }
    var regReply bhome_msg.MsgCommonReply
loop:
@@ -87,7 +87,7 @@
        select {
        case <-q:
            handle.printLog("register <-q")
            return nil,errors.New("ctx is done")
            return nil, errors.New("ctx is done")
        default:
            if bhsgo.Register(&procI, &regReply, handle.conf.sendTimeOut) {
@@ -101,7 +101,7 @@
    if ri.PubTopic != nil && len(ri.PubTopic) > 0 {
        topics := bhome_msg.MsgTopicList{}
        var regTopicReply bhome_msg.MsgCommonReply
        for _,t := range ri.PubTopic {
        for _, t := range ri.PubTopic {
            topics.TopicList = append(topics.TopicList, []byte(t))
        }
    loopRT:
@@ -124,13 +124,13 @@
        go recvRequestRoutine(ctx, handle.wg, handle.ChReply, handle.printLog)
    }
    handle.printLog("register done!" )
    handle.printLog("register done!")
    //有订阅消息才需要启动协程接收消息
    if len(ri.SubTopic) > 0 {
        handle.printLog("sub topics")
        var subList bhome_msg.MsgTopicList
        for _,v := range ri.SubTopic {
        for _, v := range ri.SubTopic {
            subList.TopicList = append(subList.TopicList, []byte(v))
        }
@@ -143,7 +143,7 @@
    if len(ri.SubNetTopic) > 0 {
        handle.printLog("sub net topics")
        var subNetList bhome_msg.MsgTopicList
        for _,v := range ri.SubNetTopic {
        for _, v := range ri.SubNetTopic {
            subNetList.TopicList = append(subNetList.TopicList, []byte(v))
        }
        var subNetReply bhome_msg.MsgCommonReply
@@ -161,7 +161,7 @@
    return handle, nil
}
func recvSubRoutine(ctx context.Context,wg *sync.WaitGroup, ch chan <-bhome_msg.MsgPublish, logFn func(...interface{})) {
func recvSubRoutine(ctx context.Context, wg *sync.WaitGroup, ch chan<- bhome_msg.MsgPublish, logFn func(...interface{})) {
    var procId string
    var msg bhome_msg.MsgPublish
    for {
@@ -190,7 +190,7 @@
    h.printLog("DeRegister")
    req := bhome_msg.ProcInfo{
        ProcId: []byte(h.ri.Proc.ID),
        Name: []byte(h.ri.Proc.Name),
        Name:   []byte(h.ri.Proc.Name),
    }
    reply := bhome_msg.MsgCommonReply{}
    if !bhsgo.Unregister(&req, &reply, h.conf.sendTimeOut) {
@@ -214,12 +214,11 @@
    h.printLog("h.wg.Wait done")
}
//HeartBeat send
func (h *BHBus) HeartBeat() error {
    procI := bhome_msg.ProcInfo{
        ProcId: []byte(h.ri.Proc.ID),
        Name: []byte(h.ri.Proc.Name),
        Name:   []byte(h.ri.Proc.Name),
    }
    var ret bhome_msg.MsgCommonReply
    if bhsgo.Heartbeat(&procI, &ret, h.conf.sendTimeOut) {
@@ -228,8 +227,6 @@
        return errors.New("send heartBeat return false")
    }
}
//更新主题列表
func (h *BHBus) UpdateNodeTopics(arr []NodeInfo) {
@@ -241,7 +238,7 @@
//获取topic对应的key
//如果传了serverId不为空,则获取指定机器上的topic-key
//如果server为空,则获取所有节点上topic-key
func (h *BHBus) GetNetNodeByTopic(serverId string,srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress,error) {
func (h *BHBus) GetNetNodeByTopic(serverId string, srcProc *ProcInfo, topic string) ([]*bhome_msg.MsgQueryTopicReply_BHNodeAddress, error) {
    dest := bhome_msg.BHAddress{}
    reqTopic := bhome_msg.MsgQueryTopic{
        Topic: []byte(topic),
@@ -267,7 +264,7 @@
        var reply Reply
        if err := json.Unmarshal(mrt.Data, &reply); err != nil {
            h.printLog("bhsgo.Request ret true, but unmarshal err:", err, " mrt.Data:", string(mrt.Data))
            return nil,err
            return nil, err
        }
        return &reply, nil
@@ -297,13 +294,14 @@
}
func (h *BHBus) Reply(src unsafe.Pointer, i *Reply) error {
    data,err := json.Marshal(i)
    data, err := json.Marshal(i)
    if err != nil {
        return err
    }
    rep := bhome_msg.MsgRequestTopicReply{
        Data: data,
    }
    if bhsgo.SendReply(src, &rep) {
        return nil
    }
@@ -321,7 +319,6 @@
        return nil, errors.New("QueryProcs ret flase")
    }
}
//向主题通道中发布消息
func (h *BHBus) Pub(nodes []bhome_msg.BHAddress, msg *bhome_msg.MsgPublish) error {
@@ -341,7 +338,7 @@
//追加订阅的主题消息
func (h *BHBus) Sub(topics []string) {
    if topics != nil && len(topics) >0 {
    if topics != nil && len(topics) > 0 {
        var subList bhome_msg.MsgTopicList
        for _, v := range topics {
            subList.TopicList = append(subList.TopicList, []byte(v))
@@ -359,4 +356,4 @@
    if topics != nil {
    }
}
}