zhangmeng
2020-07-31 dba48754d9623a49b155e94c65341b773bf4eeef
add get topic key
3个文件已修改
699 ■■■■ 已修改文件
library.go 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
softbus.pb.go 576 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
softbus.proto 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
library.go
@@ -3,6 +3,7 @@
import (
    "context"
    "fmt"
    "sync"
    "time"
    "github.com/golang/protobuf/proto"
@@ -17,12 +18,12 @@
    GetTopicInfoTypeChannel = "getchannel"
)
type subOReply struct {
type sockServer struct {
    sock *DgramSocket
    info *ProcInfo
}
type sockRe struct {
type sockClient struct {
    sock *DgramSocket
    peer int
}
@@ -34,21 +35,31 @@
}
// Handle handler
/*
sockHB/sockPub/sockWorker可以使用一个socket
但是由于需要支持多线程且心跳/发布都是很重要的信息,单独一个socket处理
worker处理短时的发送
*/
type Handle struct {
    ctx context.Context
    // 创建channel对应的reply,等待读取其中的内容,server
    // 其中必须有一个作为Request函数的server
    m map[string]*subOReply
    // 创建心跳连接,client,仅发送心跳信息
    sockHB *sockRe
    // 创建更新主题连接,client,仅发送主题更新信息
    sockUp *sockRe
    // 创建订阅的socket
    sockSub *subOReply
    m map[string]*sockServer
    // 创建reply服务Request函数
    sockRep *sockRe
    sockRep *sockServer
    // 创建心跳连接,client,仅发送心跳信息
    // 心跳需要保证单独的socket发送,如果跟其他共用socket,如果阻塞就无法发送
    sockHB *sockClient
    // 创建更新主题连接,client,仅发送主题更新信息
    // 发送本身的pub信息,很可能其他进程依赖,需要单独socket处理
    sockPub *sockClient
    // 创建订阅的socket
    // 订阅的主题发送的消息
    sockSub *sockClient
    // 创建一个万能socket发送给任意server
    sockWorker *sockRe
    sockWorker *sockClient
    // 多线程
    mtxWorker sync.Mutex
    chSub   chan TransInfo
    chReply chan TransInfo
@@ -62,7 +73,7 @@
        v.sock.Close()
    }
    h.sockHB.sock.Close()
    h.sockUp.sock.Close()
    h.sockPub.sock.Close()
    h.sockSub.sock.Close()
    h.sockRep.sock.Close()
    h.sockWorker.sock.Close()
@@ -89,7 +100,7 @@
// Register reg
func Register(ctx context.Context, info *RegisterInfo) *Handle {
    m := make(map[string]*subOReply)
    m := make(map[string]*sockServer)
    // 首先请求一堆key
    sockReg := OpenDgramSocket()
@@ -125,12 +136,12 @@
        return nil
    }
    // 收发req/rep channel
    // 收发req/rep channel, server
    for _, v := range info.Channel {
        if k, ok := regReply.ChannelKey[v]; ok {
            s := OpenDgramSocket()
            s.Bind(int(k))
            m[v] = &subOReply{
            m[v] = &sockServer{
                sock: s,
                info: info.ProcInfo,
            }
@@ -141,20 +152,29 @@
    chSub := make(chan TransInfo, chSize)
    chReply := make(chan TransInfo, chSize)
    // heartbeat使用一个socket
    // reply使用一个,服务Request, server
    sockReply := OpenDgramSocket()
    sockReply.Bind(int(regReply.ReplyKey))
    // 启动接收线程
    go recvRoutine(ctx, sockReply, chSub)
    repS := &sockServer{
        sock: sockReply,
        info: info.ProcInfo,
    }
    // heartbeat使用一个socket, client
    sockHB := OpenDgramSocket()
    hbr := &sockRe{
    hbC := &sockClient{
        sock: sockHB,
        peer: int(regReply.HeartbeatKey),
    }
    // 更新主题使用一个
    // 发布主题使用一个, client
    sockUp := OpenDgramSocket()
    upr := &sockRe{
    pubC := &sockClient{
        sock: sockUp,
        peer: int(regReply.UpdateTopicKey),
    }
    // sub使用一个socket
    // sub使用一个socket, client
    sockSub := OpenDgramSocket()
    // sockSub.Bind(int(regReply.SubTopicKey))
    // 订阅主题
@@ -163,34 +183,25 @@
    }
    // 启动接收线程
    go recvRoutine(ctx, sockSub, chSub)
    sub := &subOReply{
    subC := &sockClient{
        sock: sockSub,
        info: info.ProcInfo,
    }
    // reply使用一个,服务Request
    sockReply := OpenDgramSocket()
    sockReply.Bind(int(regReply.ReplyKey))
    // 启动接收线程
    go recvRoutine(ctx, sockReply, chSub)
    rer := &sockRe{
        sock: sockReply,
        peer: -1,
    }
    // 万能socket,仅作为客户端使用
    sockW := OpenDgramSocket()
    swr := &sockRe{
    uniC := &sockClient{
        sock: sockW,
        peer: -1,
    }
    handle := &Handle{
        ctx:        ctx,
        m:          m,
        sockHB:     hbr,
        sockUp:     upr,
        sockSub:    sub,
        sockRep:    rer,
        sockWorker: swr,
        sockHB:     hbC,
        sockPub:    pubC,
        sockSub:    subC,
        sockRep:    repS,
        sockWorker: uniC,
        chSub:      chSub,
        chReply:    chReply,
    }
@@ -207,11 +218,27 @@
    if v, ok := h.m[topic]; ok {
        return v.sock.Port()
    }
    // 远程获取
    msg := &TopicInfo{
        Topic:     topic,
        TopicType: typ,
    }
    if data, err := proto.Marshal(msg); err == nil {
        h.mtxWorker.Lock()
        if rdata, err := h.sockWorker.sock.SendAndRecv(data, h.sockWorker.peer); err == nil {
            h.mtxWorker.Unlock()
            var rmsg *TopicInfoReply
            if err := proto.Unmarshal(rdata, rmsg); err == nil {
                return int(rmsg.Key)
            }
        }
        h.mtxWorker.Unlock()
    }
    return -1
}
func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
    if r := sr.sock.SendTo(data, sr.peer); r != 0 {
func (h *Handle) send2(sc *sockClient, data []byte, logID string) error {
    if r := sc.sock.SendTo(data, sc.peer); r != 0 {
        return fmt.Errorf("%s SendTo Failed: %d", logID, r)
    }
    return nil
@@ -228,6 +255,8 @@
// SendOnly no recv
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
    h.mtxWorker.Lock()
    defer h.mtxWorker.Unlock()
    msg, err := proto.Marshal(info)
    if err == nil {
        if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
@@ -241,13 +270,16 @@
func (h *Handle) Pub(info *MsgInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        return h.send2(h.sockUp, msg, "Pub")
        return h.send2(h.sockPub, msg, "Pub")
    }
    return err
}
// Request req sync
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
    h.mtxWorker.Lock()
    defer h.mtxWorker.Unlock()
    msg, err := proto.Marshal(info)
    if err != nil {
        return nil
softbus.pb.go
@@ -187,6 +187,7 @@
    SubTopicKey          int32            `protobuf:"varint,4,opt,name=subTopicKey,proto3" json:"subTopicKey,omitempty"`
    UpdateTopicKey       int32            `protobuf:"varint,5,opt,name=updateTopicKey,proto3" json:"updateTopicKey,omitempty"`
    ReplyKey             int32            `protobuf:"varint,6,opt,name=replyKey,proto3" json:"replyKey,omitempty"`
    GetTopicKey          int32            `protobuf:"varint,7,opt,name=getTopicKey,proto3" json:"getTopicKey,omitempty"`
    XXX_NoUnkeyedLiteral struct{}         `json:"-"`
    XXX_unrecognized     []byte           `json:"-"`
    XXX_sizecache        int32            `json:"-"`
@@ -267,6 +268,123 @@
    return 0
}
func (m *RegisterInfoReply) GetGetTopicKey() int32 {
    if m != nil {
        return m.GetTopicKey
    }
    return 0
}
type TopicInfo struct {
    Topic                string   `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
    TopicType            string   `protobuf:"bytes,2,opt,name=topicType,proto3" json:"topicType,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}
func (m *TopicInfo) Reset()         { *m = TopicInfo{} }
func (m *TopicInfo) String() string { return proto.CompactTextString(m) }
func (*TopicInfo) ProtoMessage()    {}
func (*TopicInfo) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{3}
}
func (m *TopicInfo) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
}
func (m *TopicInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    if deterministic {
        return xxx_messageInfo_TopicInfo.Marshal(b, m, deterministic)
    } else {
        b = b[:cap(b)]
        n, err := m.MarshalTo(b)
        if err != nil {
            return nil, err
        }
        return b[:n], nil
    }
}
func (m *TopicInfo) XXX_Merge(src proto.Message) {
    xxx_messageInfo_TopicInfo.Merge(m, src)
}
func (m *TopicInfo) XXX_Size() int {
    return m.Size()
}
func (m *TopicInfo) XXX_DiscardUnknown() {
    xxx_messageInfo_TopicInfo.DiscardUnknown(m)
}
var xxx_messageInfo_TopicInfo proto.InternalMessageInfo
func (m *TopicInfo) GetTopic() string {
    if m != nil {
        return m.Topic
    }
    return ""
}
func (m *TopicInfo) GetTopicType() string {
    if m != nil {
        return m.TopicType
    }
    return ""
}
type TopicInfoReply struct {
    Info                 *TopicInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"`
    Key                  int32      `protobuf:"varint,2,opt,name=key,proto3" json:"key,omitempty"`
    XXX_NoUnkeyedLiteral struct{}   `json:"-"`
    XXX_unrecognized     []byte     `json:"-"`
    XXX_sizecache        int32      `json:"-"`
}
func (m *TopicInfoReply) Reset()         { *m = TopicInfoReply{} }
func (m *TopicInfoReply) String() string { return proto.CompactTextString(m) }
func (*TopicInfoReply) ProtoMessage()    {}
func (*TopicInfoReply) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{4}
}
func (m *TopicInfoReply) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
}
func (m *TopicInfoReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    if deterministic {
        return xxx_messageInfo_TopicInfoReply.Marshal(b, m, deterministic)
    } else {
        b = b[:cap(b)]
        n, err := m.MarshalTo(b)
        if err != nil {
            return nil, err
        }
        return b[:n], nil
    }
}
func (m *TopicInfoReply) XXX_Merge(src proto.Message) {
    xxx_messageInfo_TopicInfoReply.Merge(m, src)
}
func (m *TopicInfoReply) XXX_Size() int {
    return m.Size()
}
func (m *TopicInfoReply) XXX_DiscardUnknown() {
    xxx_messageInfo_TopicInfoReply.DiscardUnknown(m)
}
var xxx_messageInfo_TopicInfoReply proto.InternalMessageInfo
func (m *TopicInfoReply) GetInfo() *TopicInfo {
    if m != nil {
        return m.Info
    }
    return nil
}
func (m *TopicInfoReply) GetKey() int32 {
    if m != nil {
        return m.Key
    }
    return 0
}
type HeartbeatInfo struct {
    HealthLevel          string    `protobuf:"bytes,1,opt,name=healthLevel,proto3" json:"healthLevel,omitempty"`
    Fps                  int32     `protobuf:"varint,2,opt,name=fps,proto3" json:"fps,omitempty"`
@@ -284,7 +402,7 @@
func (m *HeartbeatInfo) String() string { return proto.CompactTextString(m) }
func (*HeartbeatInfo) ProtoMessage()    {}
func (*HeartbeatInfo) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{3}
    return fileDescriptor_61abf516b68179fd, []int{5}
}
func (m *HeartbeatInfo) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
@@ -378,7 +496,7 @@
func (m *MsgInfo) String() string { return proto.CompactTextString(m) }
func (*MsgInfo) ProtoMessage()    {}
func (*MsgInfo) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{4}
    return fileDescriptor_61abf516b68179fd, []int{6}
}
func (m *MsgInfo) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
@@ -454,6 +572,8 @@
    proto.RegisterType((*RegisterInfo)(nil), "softbus.RegisterInfo")
    proto.RegisterType((*RegisterInfoReply)(nil), "softbus.RegisterInfoReply")
    proto.RegisterMapType((map[string]int32)(nil), "softbus.RegisterInfoReply.ChannelKeyEntry")
    proto.RegisterType((*TopicInfo)(nil), "softbus.TopicInfo")
    proto.RegisterType((*TopicInfoReply)(nil), "softbus.TopicInfoReply")
    proto.RegisterType((*HeartbeatInfo)(nil), "softbus.HeartbeatInfo")
    proto.RegisterType((*MsgInfo)(nil), "softbus.MsgInfo")
}
@@ -461,41 +581,45 @@
func init() { proto.RegisterFile("softbus.proto", fileDescriptor_61abf516b68179fd) }
var fileDescriptor_61abf516b68179fd = []byte{
    // 534 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xdd, 0x8a, 0xd3, 0x40,
    0x14, 0x36, 0xfd, 0xdd, 0x3d, 0x6d, 0x75, 0x77, 0x10, 0x09, 0x8b, 0x94, 0x12, 0x44, 0x44, 0xb1,
    0x17, 0xeb, 0x8d, 0x08, 0xde, 0x68, 0x05, 0xab, 0x55, 0x4a, 0xdc, 0x17, 0x48, 0xda, 0xd3, 0xa6,
    0x98, 0x66, 0xc2, 0x64, 0x5a, 0x89, 0x2f, 0xe1, 0xad, 0x0f, 0x21, 0x3e, 0x87, 0x97, 0xde, 0x79,
    0x2b, 0xf5, 0x05, 0x7c, 0x04, 0x39, 0x27, 0x33, 0x69, 0xba, 0x22, 0xb2, 0x77, 0xe7, 0x3b, 0xe7,
    0x9b, 0xc9, 0xf7, 0xcd, 0x7c, 0x13, 0xe8, 0x65, 0x72, 0xa1, 0xc3, 0x4d, 0x36, 0x4c, 0x95, 0xd4,
    0x52, 0xb4, 0x0d, 0xf4, 0xbe, 0x3a, 0x70, 0x34, 0x55, 0x72, 0x36, 0x4e, 0x16, 0x52, 0x9c, 0xc1,
    0x51, 0x86, 0x6a, 0x8b, 0x6a, 0x3c, 0x72, 0x9d, 0x81, 0x73, 0xef, 0xd8, 0x2f, 0xb1, 0x70, 0xa1,
    0x1d, 0xca, 0x40, 0xcd, 0xc7, 0x23, 0xb7, 0xc6, 0x23, 0x0b, 0x2b, 0xab, 0xa6, 0x6e, 0xfd, 0x60,
    0xd5, 0x94, 0x66, 0xa9, 0x92, 0xb3, 0xb7, 0xc1, 0x1a, 0xdd, 0x46, 0x31, 0xb3, 0x58, 0xdc, 0x82,
    0x16, 0xd5, 0xe3, 0x91, 0xdb, 0xe4, 0x89, 0x41, 0xe2, 0x36, 0x1c, 0x53, 0x35, 0x09, 0x42, 0x8c,
    0xdd, 0x16, 0x8f, 0xf6, 0x0d, 0xef, 0x93, 0x03, 0x5d, 0x1f, 0x97, 0xab, 0x4c, 0xa3, 0x62, 0xd1,
    0x0f, 0x8b, 0x4f, 0x50, 0xcd, 0xa2, 0x3b, 0xe7, 0xa7, 0x43, 0x6b, 0xd6, 0x3a, 0xf3, 0x4b, 0x0a,
    0xf9, 0x98, 0x45, 0x41, 0x92, 0x60, 0xec, 0xd6, 0x06, 0x75, 0xf2, 0x61, 0x20, 0x6b, 0xdd, 0x84,
    0x17, 0x32, 0x5d, 0xcd, 0xdc, 0x3a, 0x8f, 0x4a, 0xcc, 0x1e, 0xed, 0xac, 0x51, 0xcc, 0x2c, 0xf6,
    0x7e, 0xd4, 0xe0, 0xb4, 0xaa, 0xc8, 0xc7, 0x34, 0xce, 0xaf, 0x2a, 0xeb, 0x15, 0x80, 0xd1, 0xf1,
    0x1a, 0x73, 0x56, 0xd6, 0x39, 0xbf, 0x5f, 0x2e, 0xf8, 0x6b, 0xfb, 0xe1, 0xf3, 0x92, 0xfc, 0x22,
    0xd1, 0x2a, 0xf7, 0x2b, 0xab, 0x85, 0x07, 0xdd, 0x08, 0x03, 0xa5, 0x43, 0x0c, 0x34, 0xed, 0x46,
    0x97, 0xd2, 0xf4, 0x0f, 0x7a, 0x62, 0x00, 0x1d, 0x6b, 0x80, 0x28, 0x0d, 0xa6, 0x54, 0x5b, 0xe2,
    0x2e, 0x5c, 0xdf, 0xa4, 0xf3, 0x40, 0x63, 0x49, 0x6a, 0x32, 0xe9, 0x52, 0x97, 0x8e, 0x46, 0x91,
    0x24, 0x62, 0xb4, 0x98, 0x51, 0xe2, 0xb3, 0xa7, 0x70, 0xe3, 0x92, 0x50, 0x71, 0x02, 0xf5, 0xf7,
    0x98, 0x9b, 0x78, 0x51, 0x29, 0x6e, 0x42, 0x73, 0x1b, 0xc4, 0x1b, 0xe4, 0x5c, 0x35, 0xfd, 0x02,
    0x3c, 0xa9, 0x3d, 0x76, 0xbc, 0xdf, 0x0e, 0xf4, 0x5e, 0x5a, 0xd5, 0x7c, 0x4c, 0x03, 0xe8, 0x44,
    0x18, 0xc4, 0x3a, 0x9a, 0xe0, 0x16, 0x63, 0xb3, 0x4b, 0xb5, 0x45, 0xfb, 0x2f, 0xd2, 0xcc, 0xec,
    0x45, 0x25, 0x09, 0xfc, 0x10, 0xa8, 0x84, 0x6f, 0xc2, 0xe4, 0xd3, 0x62, 0xca, 0x1a, 0x2a, 0x25,
    0xf9, 0x60, 0x4d, 0x40, 0xf7, 0x0d, 0x9a, 0x4a, 0x1d, 0x15, 0xc7, 0xce, 0xee, 0xbb, 0xfe, 0xbe,
    0x21, 0xee, 0x40, 0xaf, 0x04, 0xef, 0x56, 0x1f, 0xd1, 0xb8, 0x3f, 0x6c, 0x1e, 0xe4, 0xa0, 0xfd,
    0xdf, 0x1c, 0x78, 0x5f, 0x1c, 0x68, 0xbf, 0xc9, 0x96, 0xfc, 0x81, 0x07, 0xd0, 0xce, 0xd4, 0x8c,
    0x48, 0xff, 0x4e, 0x90, 0x65, 0x50, 0xae, 0xd7, 0xd9, 0xf2, 0x22, 0x4f, 0xd1, 0xbe, 0x4f, 0x03,
    0xe9, 0x7c, 0xb5, 0x09, 0x35, 0xf5, 0x0b, 0x40, 0xaf, 0x2f, 0x8b, 0xd6, 0xfb, 0xbb, 0x37, 0x48,
    0x08, 0x68, 0x84, 0x72, 0x9e, 0x1b, 0xbb, 0x5c, 0x17, 0x6f, 0x7f, 0x9e, 0x4f, 0x30, 0x31, 0x1e,
    0x2d, 0x7c, 0x76, 0xf2, 0x6d, 0xd7, 0x77, 0xbe, 0xef, 0xfa, 0xce, 0xcf, 0x5d, 0xdf, 0xf9, 0xfc,
    0xab, 0x7f, 0x2d, 0x6c, 0xf1, 0x0f, 0xe6, 0xd1, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9f, 0x8a,
    0x04, 0x17, 0x71, 0x04, 0x00, 0x00,
    // 593 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xd1, 0x8a, 0xd3, 0x5c,
    0x10, 0xfe, 0xd3, 0x36, 0xcd, 0xee, 0xb4, 0xdd, 0x7f, 0xf7, 0xb0, 0x2c, 0x61, 0x91, 0x52, 0x82,
    0x2c, 0xa2, 0xd8, 0x8b, 0xf5, 0x46, 0x04, 0x11, 0xb4, 0x82, 0x5d, 0xab, 0x94, 0xb8, 0x2f, 0x90,
    0xb4, 0xd3, 0xa6, 0x98, 0x26, 0xe1, 0x24, 0xad, 0xc4, 0x97, 0xf0, 0xd6, 0x87, 0x10, 0x9f, 0xc3,
    0x4b, 0x1f, 0x41, 0xea, 0x0b, 0x78, 0xeb, 0x9d, 0xcc, 0xe4, 0x9c, 0x24, 0x5d, 0x11, 0xf1, 0x6e,
    0xbe, 0x99, 0xef, 0xcc, 0xf9, 0xe6, 0xcc, 0x97, 0x40, 0x2f, 0x8d, 0x17, 0x99, 0xbf, 0x49, 0x87,
    0x89, 0x8c, 0xb3, 0x58, 0x58, 0x0a, 0x3a, 0x9f, 0x0d, 0x38, 0x98, 0xca, 0x78, 0x36, 0x8e, 0x16,
    0xb1, 0x38, 0x87, 0x83, 0x14, 0xe5, 0x16, 0xe5, 0x78, 0x64, 0x1b, 0x03, 0xe3, 0xce, 0xa1, 0x5b,
    0x62, 0x61, 0x83, 0xe5, 0xc7, 0x9e, 0x9c, 0x8f, 0x47, 0x76, 0x83, 0x4b, 0x1a, 0xd6, 0x4e, 0x4d,
    0xed, 0xe6, 0xde, 0xa9, 0x29, 0xd5, 0x12, 0x19, 0xcf, 0x5e, 0x7b, 0x6b, 0xb4, 0x5b, 0x45, 0x4d,
    0x63, 0x71, 0x06, 0x6d, 0x8a, 0xc7, 0x23, 0xdb, 0xe4, 0x8a, 0x42, 0xe2, 0x16, 0x1c, 0x52, 0x34,
    0xf1, 0x7c, 0x0c, 0xed, 0x36, 0x97, 0xaa, 0x84, 0xf3, 0xc1, 0x80, 0xae, 0x8b, 0xcb, 0x55, 0x9a,
    0xa1, 0x64, 0xd1, 0xf7, 0x8b, 0x2b, 0x28, 0x66, 0xd1, 0x9d, 0xcb, 0x93, 0xa1, 0x1e, 0x56, 0x4f,
    0xe6, 0x96, 0x14, 0x9a, 0x63, 0x16, 0x78, 0x51, 0x84, 0xa1, 0xdd, 0x18, 0x34, 0x69, 0x0e, 0x05,
    0x59, 0xeb, 0xc6, 0xbf, 0x8e, 0x93, 0xd5, 0xcc, 0x6e, 0x72, 0xa9, 0xc4, 0x3c, 0xa3, 0xae, 0xb5,
    0x8a, 0x9a, 0xc6, 0xce, 0xcf, 0x06, 0x9c, 0xd4, 0x15, 0xb9, 0x98, 0x84, 0xf9, 0xbf, 0xca, 0xba,
    0x02, 0x50, 0x3a, 0x5e, 0x62, 0xce, 0xca, 0x3a, 0x97, 0x77, 0xcb, 0x03, 0xbf, 0xb5, 0x1f, 0x3e,
    0x2b, 0xc9, 0xcf, 0xa3, 0x4c, 0xe6, 0x6e, 0xed, 0xb4, 0x70, 0xa0, 0x1b, 0xa0, 0x27, 0x33, 0x1f,
    0xbd, 0x8c, 0xba, 0xd1, 0x52, 0x4c, 0x77, 0x2f, 0x27, 0x06, 0xd0, 0xd1, 0x03, 0x10, 0xa5, 0xc5,
    0x94, 0x7a, 0x4a, 0x5c, 0xc0, 0xd1, 0x26, 0x99, 0x7b, 0x19, 0x96, 0x24, 0x93, 0x49, 0x37, 0xb2,
    0xf4, 0x34, 0x92, 0x24, 0x11, 0xa3, 0xcd, 0x8c, 0x12, 0xd3, 0x2d, 0x4b, 0xcc, 0xca, 0x06, 0x56,
    0x71, 0x4b, 0x2d, 0x75, 0xfe, 0x18, 0xfe, 0xbf, 0x31, 0x8a, 0x38, 0x86, 0xe6, 0x5b, 0xcc, 0x95,
    0x01, 0x29, 0x14, 0xa7, 0x60, 0x6e, 0xbd, 0x70, 0x83, 0xec, 0x3c, 0xd3, 0x2d, 0xc0, 0xa3, 0xc6,
    0x43, 0xc3, 0x79, 0x02, 0x87, 0xdc, 0x8a, 0xdf, 0xf0, 0x14, 0xcc, 0x8c, 0x37, 0x54, 0x1c, 0x2d,
    0x00, 0xd9, 0x89, 0x83, 0xeb, 0x3c, 0x41, 0x65, 0xdd, 0x2a, 0xe1, 0x5c, 0xc1, 0x51, 0xd9, 0xa0,
    0x58, 0xdc, 0x05, 0xb4, 0x56, 0xd5, 0xd2, 0x44, 0xb9, 0x83, 0x8a, 0xc6, 0x75, 0x2d, 0xb3, 0x90,
    0x44, 0xa1, 0xf3, 0xc3, 0x80, 0xde, 0x0b, 0xfd, 0xc8, 0xac, 0x68, 0x00, 0x9d, 0x00, 0xbd, 0x30,
    0x0b, 0x26, 0xb8, 0xc5, 0x50, 0xe9, 0xaa, 0xa7, 0xa8, 0xcb, 0x22, 0x49, 0x75, 0x97, 0x45, 0x92,
    0xd2, 0x7b, 0xbe, 0xf3, 0x64, 0xc4, 0xc6, 0x51, 0x9f, 0x93, 0xc6, 0x34, 0x0b, 0x4a, 0x19, 0xb3,
    0x0f, 0xd4, 0xf7, 0x54, 0x25, 0xa8, 0x1a, 0x67, 0x41, 0xe1, 0x12, 0x5e, 0x56, 0xd7, 0xad, 0x12,
    0xe2, 0x36, 0xf4, 0x4a, 0xf0, 0x66, 0xf5, 0x1e, 0xd5, 0xb2, 0xf6, 0x93, 0x7b, 0xb6, 0xb5, 0xfe,
    0x6a, 0x5b, 0xe7, 0x93, 0x01, 0xd6, 0xab, 0x74, 0xc9, 0x17, 0xdc, 0x03, 0x2b, 0x95, 0x33, 0x22,
    0xfd, 0xd9, 0xf0, 0x9a, 0x41, 0x9f, 0xe1, 0x3a, 0x5d, 0xd6, 0x76, 0xa2, 0x61, 0xb5, 0xc5, 0x66,
    0x7d, 0x8b, 0x67, 0xd0, 0x4e, 0x83, 0x75, 0x65, 0x55, 0x85, 0x84, 0x80, 0x96, 0x1f, 0xcf, 0x73,
    0x35, 0x2e, 0xc7, 0xc5, 0xaf, 0x6a, 0x9e, 0x4f, 0x30, 0x52, 0x33, 0x6a, 0xf8, 0xf4, 0xf8, 0xcb,
    0xae, 0x6f, 0x7c, 0xdd, 0xf5, 0x8d, 0x6f, 0xbb, 0xbe, 0xf1, 0xf1, 0x7b, 0xff, 0x3f, 0xbf, 0xcd,
    0xff, 0xc3, 0x07, 0xbf, 0x02, 0x00, 0x00, 0xff, 0xff, 0x65, 0x52, 0xb8, 0x9f, 0x20, 0x05, 0x00,
    0x00,
}
func (m *ProcInfo) Marshal() (dAtA []byte, err error) {
@@ -692,6 +816,80 @@
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.ReplyKey))
    }
    if m.GetTopicKey != 0 {
        dAtA[i] = 0x38
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.GetTopicKey))
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
    }
    return i, nil
}
func (m *TopicInfo) Marshal() (dAtA []byte, err error) {
    size := m.Size()
    dAtA = make([]byte, size)
    n, err := m.MarshalTo(dAtA)
    if err != nil {
        return nil, err
    }
    return dAtA[:n], nil
}
func (m *TopicInfo) MarshalTo(dAtA []byte) (int, error) {
    var i int
    _ = i
    var l int
    _ = l
    if len(m.Topic) > 0 {
        dAtA[i] = 0xa
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(len(m.Topic)))
        i += copy(dAtA[i:], m.Topic)
    }
    if len(m.TopicType) > 0 {
        dAtA[i] = 0x12
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(len(m.TopicType)))
        i += copy(dAtA[i:], m.TopicType)
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
    }
    return i, nil
}
func (m *TopicInfoReply) Marshal() (dAtA []byte, err error) {
    size := m.Size()
    dAtA = make([]byte, size)
    n, err := m.MarshalTo(dAtA)
    if err != nil {
        return nil, err
    }
    return dAtA[:n], nil
}
func (m *TopicInfoReply) MarshalTo(dAtA []byte) (int, error) {
    var i int
    _ = i
    var l int
    _ = l
    if m.Info != nil {
        dAtA[i] = 0xa
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.Info.Size()))
        n3, err3 := m.Info.MarshalTo(dAtA[i:])
        if err3 != nil {
            return 0, err3
        }
        i += n3
    }
    if m.Key != 0 {
        dAtA[i] = 0x10
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.Key))
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
    }
@@ -751,11 +949,11 @@
        dAtA[i] = 0x3a
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.ProcInfo.Size()))
        n3, err3 := m.ProcInfo.MarshalTo(dAtA[i:])
        if err3 != nil {
            return 0, err3
        n4, err4 := m.ProcInfo.MarshalTo(dAtA[i:])
        if err4 != nil {
            return 0, err4
        }
        i += n3
        i += n4
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
@@ -782,11 +980,11 @@
        dAtA[i] = 0xa
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.SrcProc.Size()))
        n4, err4 := m.SrcProc.MarshalTo(dAtA[i:])
        if err4 != nil {
            return 0, err4
        n5, err5 := m.SrcProc.MarshalTo(dAtA[i:])
        if err5 != nil {
            return 0, err5
        }
        i += n4
        i += n5
    }
    if len(m.MsgType) > 0 {
        dAtA[i] = 0x12
@@ -930,6 +1128,48 @@
    }
    if m.ReplyKey != 0 {
        n += 1 + sovSoftbus(uint64(m.ReplyKey))
    }
    if m.GetTopicKey != 0 {
        n += 1 + sovSoftbus(uint64(m.GetTopicKey))
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
    }
    return n
}
func (m *TopicInfo) Size() (n int) {
    if m == nil {
        return 0
    }
    var l int
    _ = l
    l = len(m.Topic)
    if l > 0 {
        n += 1 + l + sovSoftbus(uint64(l))
    }
    l = len(m.TopicType)
    if l > 0 {
        n += 1 + l + sovSoftbus(uint64(l))
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
    }
    return n
}
func (m *TopicInfoReply) Size() (n int) {
    if m == nil {
        return 0
    }
    var l int
    _ = l
    if m.Info != nil {
        l = m.Info.Size()
        n += 1 + l + sovSoftbus(uint64(l))
    }
    if m.Key != 0 {
        n += 1 + sovSoftbus(uint64(m.Key))
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
@@ -1708,6 +1948,252 @@
                    break
                }
            }
        case 7:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field GetTopicKey", wireType)
            }
            m.GetTopicKey = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.GetTopicKey |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        default:
            iNdEx = preIndex
            skippy, err := skipSoftbus(dAtA[iNdEx:])
            if err != nil {
                return err
            }
            if skippy < 0 {
                return ErrInvalidLengthSoftbus
            }
            if (iNdEx + skippy) < 0 {
                return ErrInvalidLengthSoftbus
            }
            if (iNdEx + skippy) > l {
                return io.ErrUnexpectedEOF
            }
            m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
            iNdEx += skippy
        }
    }
    if iNdEx > l {
        return io.ErrUnexpectedEOF
    }
    return nil
}
func (m *TopicInfo) Unmarshal(dAtA []byte) error {
    l := len(dAtA)
    iNdEx := 0
    for iNdEx < l {
        preIndex := iNdEx
        var wire uint64
        for shift := uint(0); ; shift += 7 {
            if shift >= 64 {
                return ErrIntOverflowSoftbus
            }
            if iNdEx >= l {
                return io.ErrUnexpectedEOF
            }
            b := dAtA[iNdEx]
            iNdEx++
            wire |= uint64(b&0x7F) << shift
            if b < 0x80 {
                break
            }
        }
        fieldNum := int32(wire >> 3)
        wireType := int(wire & 0x7)
        if wireType == 4 {
            return fmt.Errorf("proto: TopicInfo: wiretype end group for non-group")
        }
        if fieldNum <= 0 {
            return fmt.Errorf("proto: TopicInfo: illegal tag %d (wire type %d)", fieldNum, wire)
        }
        switch fieldNum {
        case 1:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
            }
            var stringLen uint64
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                stringLen |= uint64(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            intStringLen := int(stringLen)
            if intStringLen < 0 {
                return ErrInvalidLengthSoftbus
            }
            postIndex := iNdEx + intStringLen
            if postIndex < 0 {
                return ErrInvalidLengthSoftbus
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            m.Topic = string(dAtA[iNdEx:postIndex])
            iNdEx = postIndex
        case 2:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field TopicType", wireType)
            }
            var stringLen uint64
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                stringLen |= uint64(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            intStringLen := int(stringLen)
            if intStringLen < 0 {
                return ErrInvalidLengthSoftbus
            }
            postIndex := iNdEx + intStringLen
            if postIndex < 0 {
                return ErrInvalidLengthSoftbus
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            m.TopicType = string(dAtA[iNdEx:postIndex])
            iNdEx = postIndex
        default:
            iNdEx = preIndex
            skippy, err := skipSoftbus(dAtA[iNdEx:])
            if err != nil {
                return err
            }
            if skippy < 0 {
                return ErrInvalidLengthSoftbus
            }
            if (iNdEx + skippy) < 0 {
                return ErrInvalidLengthSoftbus
            }
            if (iNdEx + skippy) > l {
                return io.ErrUnexpectedEOF
            }
            m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
            iNdEx += skippy
        }
    }
    if iNdEx > l {
        return io.ErrUnexpectedEOF
    }
    return nil
}
func (m *TopicInfoReply) Unmarshal(dAtA []byte) error {
    l := len(dAtA)
    iNdEx := 0
    for iNdEx < l {
        preIndex := iNdEx
        var wire uint64
        for shift := uint(0); ; shift += 7 {
            if shift >= 64 {
                return ErrIntOverflowSoftbus
            }
            if iNdEx >= l {
                return io.ErrUnexpectedEOF
            }
            b := dAtA[iNdEx]
            iNdEx++
            wire |= uint64(b&0x7F) << shift
            if b < 0x80 {
                break
            }
        }
        fieldNum := int32(wire >> 3)
        wireType := int(wire & 0x7)
        if wireType == 4 {
            return fmt.Errorf("proto: TopicInfoReply: wiretype end group for non-group")
        }
        if fieldNum <= 0 {
            return fmt.Errorf("proto: TopicInfoReply: illegal tag %d (wire type %d)", fieldNum, wire)
        }
        switch fieldNum {
        case 1:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Info", wireType)
            }
            var msglen int
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                msglen |= int(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            if msglen < 0 {
                return ErrInvalidLengthSoftbus
            }
            postIndex := iNdEx + msglen
            if postIndex < 0 {
                return ErrInvalidLengthSoftbus
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            if m.Info == nil {
                m.Info = &TopicInfo{}
            }
            if err := m.Info.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
                return err
            }
            iNdEx = postIndex
        case 2:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
            }
            m.Key = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.Key |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        default:
            iNdEx = preIndex
            skippy, err := skipSoftbus(dAtA[iNdEx:])
softbus.proto
@@ -26,6 +26,17 @@
    int32 subTopicKey = 4;
    int32 updateTopicKey = 5;
    int32 replyKey = 6;
    int32 getTopicKey = 7;
}
message TopicInfo{
    string topic = 1;
    string topicType = 2;
}
message TopicInfoReply{
    TopicInfo info = 1;
    int32 key = 2;
}
message HeartbeatInfo {