zhangmeng
2020-07-30 1baf50119d7d19b276b132f6837e86b396f186ef
update dgram socket
3个文件已修改
812 ■■■■ 已修改文件
library.go 267 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
softbus.pb.go 544 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
softbus.proto 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
library.go
@@ -1,6 +1,7 @@
package softbus
import (
    "context"
    "fmt"
    "time"
@@ -8,86 +9,193 @@
)
const (
    // HeartbeatKey fixed key for hb to servermanager
    HeartbeatKey = 11
    // RegKey fixed key for hb to servermanager
    RegKey = 12
    // UpKey fixed key for update topic to servermanager
    UpKey = 13
    // GetTopicInfoTypeTopic topic
    GetTopicInfoTypeTopic = "gettopic"
    // GetTopicInfoTypeChannel channel
    GetTopicInfoTypeChannel = "getchannel"
)
type shmKeyAndProcInfo struct {
type subOReply struct {
    sock *DgramSocket
    info *ProcInfo
}
type sockRe struct {
    sock *DgramSocket
    peer int
}
// TransInfo 传输的数据和必要的记录
type TransInfo struct {
    info *MsgInfo
    port int
}
// Handle handler
type Handle struct {
    m          map[string]*shmKeyAndProcInfo
    sockWorker *DgramSocket
    ctx context.Context
    // 创建channel对应的reply,等待读取其中的内容,server
    // 其中必须有一个作为Request函数的server
    m map[string]*subOReply
    // 创建心跳连接,client,仅发送心跳信息
    sockHB *sockRe
    // 创建更新主题连接,client,仅发送主题更新信息
    sockUp *sockRe
    // 创建订阅的socket
    sockSub *subOReply
    // 创建reply服务Request函数
    sockRep *sockRe
    // 创建一个万能socket发送给任意server
    sockWorker *sockRe
    chSub   chan TransInfo
    chReply chan TransInfo
}
func garbageCollect(ctx context.Context, h *Handle) {
    <-ctx.Done()
    for _, v := range h.m {
        v.sock.Close()
    }
    h.sockHB.sock.Close()
    h.sockUp.sock.Close()
    h.sockSub.sock.Close()
    h.sockRep.sock.Close()
    h.sockWorker.sock.Close()
}
func recvRoutine(ctx context.Context, sock *DgramSocket, ch chan<- TransInfo) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            if data, peer, err := sock.RecvFrom(); err == nil {
                var info *MsgInfo
                if err := proto.Unmarshal(data, info); err == nil {
                    ch <- TransInfo{
                        info: info,
                        port: peer,
                    }
                }
            }
            time.Sleep(50 * time.Millisecond)
        }
    }
}
// Register reg
func Register(info *RegisterInfo) *Handle {
    m := make(map[string]*shmKeyAndProcInfo)
func Register(ctx context.Context, info *RegisterInfo) *Handle {
    m := make(map[string]*subOReply)
    // 首先请求一堆key
    sockReg := OpenDgramSocket()
    if sockReg == nil {
        return nil
    }
    defer sockReg.Close()
    var msg, rdata []byte
    var err error
loop:
    for {
        if msg == nil {
            if msg, err = proto.Marshal(info); err != nil {
                time.Sleep(100 * time.Millisecond)
                continue
        select {
        case <-ctx.Done():
            return nil
        default:
            if msg == nil {
                if msg, err = proto.Marshal(info); err != nil {
                    time.Sleep(100 * time.Millisecond)
                    continue
                }
            }
            if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
                break loop
            }
            time.Sleep(100 * time.Millisecond)
        }
    }
    // 得到key,赋值
    var regReply RegisterInfoReply
    if err := proto.Unmarshal(rdata, &regReply); err != nil {
        return nil
    }
    // 收发req/rep channel
    for _, v := range info.Channel {
        if k, ok := regReply.ChannelKey[v]; ok {
            s := OpenDgramSocket()
            s.Bind(int(k))
            m[v] = &subOReply{
                sock: s,
                info: info.ProcInfo,
            }
        }
        if rdata, err = sockReg.SendAndRecv(msg, RegKey); err == nil {
            break
        }
        time.Sleep(100 * time.Millisecond)
    }
    sockReg.Close()
    // 得到key,赋值
    if rdata != nil {
    }
    // 只收不发
    for _, v := range info.Channel {
        s := OpenDgramSocket()
        m[v] = &shmKeyAndProcInfo{
            sock: s,
            info: info.ProcInfo,
        }
    }
    // pub/sub使用同一个socket
    pbs := OpenDgramSocket()
    for _, v := range info.PubTopic {
        m[v] = &shmKeyAndProcInfo{
            sock: pbs,
            info: info.ProcInfo,
        }
    chSize := 5
    chSub := make(chan TransInfo, chSize)
    chReply := make(chan TransInfo, chSize)
    // heartbeat使用一个socket
    sockHB := OpenDgramSocket()
    hbr := &sockRe{
        sock: sockHB,
        peer: int(regReply.HeartbeatKey),
    }
    for _, v := range info.SubTopic {
        m[v] = &shmKeyAndProcInfo{
            sock: pbs,
            info: info.ProcInfo,
        }
    // 更新主题使用一个
    sockUp := OpenDgramSocket()
    upr := &sockRe{
        sock: sockUp,
        peer: int(regReply.UpdateTopicKey),
    }
    s := OpenDgramSocket()
    return &Handle{
    // sub使用一个socket
    sockSub := OpenDgramSocket()
    sockSub.Bind(int(regReply.SubTopicKey))
    // 启动接收线程
    go recvRoutine(ctx, sockSub, chSub)
    sub := &subOReply{
        sock: sockSub,
        info: info.ProcInfo,
    }
    // reply使用一个,服务Request
    sockReply := OpenDgramSocket()
    sockReply.Bind(int(regReply.ReplyKey))
    // 启动接收线程
    go recvRoutine(ctx, sockReply, chSub)
    rer := &sockRe{
        sock: sockReply,
        peer: -1,
    }
    // 万能socket,仅作为客户端使用
    sockW := OpenDgramSocket()
    swr := &sockRe{
        sock: sockW,
        peer: -1,
    }
    handle := &Handle{
        ctx:        ctx,
        m:          m,
        sockWorker: s,
        sockHB:     hbr,
        sockUp:     upr,
        sockSub:    sub,
        sockRep:    rer,
        sockWorker: swr,
        chSub:      chSub,
        chReply:    chReply,
    }
    go garbageCollect(ctx, handle)
    return handle
}
// GetTopicInfo get topic info
@@ -98,8 +206,8 @@
    return -1
}
func (h *Handle) send2(data []byte, key int, logID string) error {
    if r := h.sockWorker.SendTo(data, key); r != 0 {
func (h *Handle) send2(sr *sockRe, data []byte, logID string) error {
    if r := sr.sock.SendTo(data, sr.peer); r != 0 {
        return fmt.Errorf("%s SendTo Failed: %d", logID, r)
    }
    return nil
@@ -109,7 +217,7 @@
func (h *Handle) HeartBeat(info *HeartbeatInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        return h.send2(msg, HeartbeatKey, "HeartBeat")
        return h.send2(h.sockHB, msg, "HeartBeat")
    }
    return err
}
@@ -118,13 +226,70 @@
func (h *Handle) SendOnly(key int, info *MsgInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        return h.send2(msg, key, "SendOnly/Pub")
        if r := h.sockWorker.sock.SendTo(msg, key); r != 0 {
            return fmt.Errorf("SendOnly Failed: %d", r)
        }
    }
    return err
}
// Pub func
func (h *Handle) Pub(info *MsgInfo) error {
    // return h.SendOnly(PubKey, info)
    return nil
    msg, err := proto.Marshal(info)
    if err == nil {
        return h.send2(h.sockUp, msg, "Pub")
    }
    return err
}
// Request req sync
func (h *Handle) Request(key int, info *MsgInfo) *MsgInfo {
    msg, err := proto.Marshal(info)
    if err != nil {
        return nil
    }
    // 同步接口,需要等待返回值
    var ret *MsgInfo
loop:
    for {
        select {
        case <-h.ctx.Done():
            return nil
        default:
            if data, err := h.sockWorker.sock.SendAndRecv(msg, key); err == nil {
                if err := proto.Unmarshal(data, ret); err == nil {
                    break loop
                }
            }
            time.Sleep(100 * time.Millisecond)
        }
    }
    return ret
}
// Reply request
func (h *Handle) Reply(key int, info *MsgInfo) error {
    msg, err := proto.Marshal(info)
    if err == nil {
        if r := h.sockRep.sock.SendTo(msg, key); r != 0 {
            return fmt.Errorf("Reply Failed: %d", r)
        }
    }
    return err
}
// GetMesg get mesg for sub or reply
func (h *Handle) GetMesg() (subMsg *MsgInfo, replyMsg *MsgInfo, replyKey int) {
    if len(h.chSub) > 1 {
        m := <-h.chSub
        subMsg = m.info
    }
    if len(h.chReply) > 1 {
        m := <-h.chReply
        replyMsg = m.info
        replyKey = m.port
    }
    return
}
softbus.pb.go
@@ -180,6 +180,93 @@
    return nil
}
type RegisterInfoReply struct {
    ProcInfo             *ProcInfo        `protobuf:"bytes,1,opt,name=procInfo,proto3" json:"procInfo,omitempty"`
    ChannelKey           map[string]int32 `protobuf:"bytes,2,rep,name=channelKey,proto3" json:"channelKey,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
    HeartbeatKey         int32            `protobuf:"varint,3,opt,name=heartbeatKey,proto3" json:"heartbeatKey,omitempty"`
    SubTopicKey          int32            `protobuf:"varint,4,opt,name=subTopicKey,proto3" json:"subTopicKey,omitempty"`
    UpdateTopicKey       int32            `protobuf:"varint,5,opt,name=updateTopicKey,proto3" json:"updateTopicKey,omitempty"`
    ReplyKey             int32            `protobuf:"varint,6,opt,name=replyKey,proto3" json:"replyKey,omitempty"`
    XXX_NoUnkeyedLiteral struct{}         `json:"-"`
    XXX_unrecognized     []byte           `json:"-"`
    XXX_sizecache        int32            `json:"-"`
}
func (m *RegisterInfoReply) Reset()         { *m = RegisterInfoReply{} }
func (m *RegisterInfoReply) String() string { return proto.CompactTextString(m) }
func (*RegisterInfoReply) ProtoMessage()    {}
func (*RegisterInfoReply) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{2}
}
func (m *RegisterInfoReply) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
}
func (m *RegisterInfoReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    if deterministic {
        return xxx_messageInfo_RegisterInfoReply.Marshal(b, m, deterministic)
    } else {
        b = b[:cap(b)]
        n, err := m.MarshalTo(b)
        if err != nil {
            return nil, err
        }
        return b[:n], nil
    }
}
func (m *RegisterInfoReply) XXX_Merge(src proto.Message) {
    xxx_messageInfo_RegisterInfoReply.Merge(m, src)
}
func (m *RegisterInfoReply) XXX_Size() int {
    return m.Size()
}
func (m *RegisterInfoReply) XXX_DiscardUnknown() {
    xxx_messageInfo_RegisterInfoReply.DiscardUnknown(m)
}
var xxx_messageInfo_RegisterInfoReply proto.InternalMessageInfo
func (m *RegisterInfoReply) GetProcInfo() *ProcInfo {
    if m != nil {
        return m.ProcInfo
    }
    return nil
}
func (m *RegisterInfoReply) GetChannelKey() map[string]int32 {
    if m != nil {
        return m.ChannelKey
    }
    return nil
}
func (m *RegisterInfoReply) GetHeartbeatKey() int32 {
    if m != nil {
        return m.HeartbeatKey
    }
    return 0
}
func (m *RegisterInfoReply) GetSubTopicKey() int32 {
    if m != nil {
        return m.SubTopicKey
    }
    return 0
}
func (m *RegisterInfoReply) GetUpdateTopicKey() int32 {
    if m != nil {
        return m.UpdateTopicKey
    }
    return 0
}
func (m *RegisterInfoReply) GetReplyKey() int32 {
    if m != nil {
        return m.ReplyKey
    }
    return 0
}
type HeartbeatInfo struct {
    HealthLevel          string   `protobuf:"bytes,1,opt,name=healthLevel,proto3" json:"healthLevel,omitempty"`
    Fps                  int32    `protobuf:"varint,2,opt,name=fps,proto3" json:"fps,omitempty"`
@@ -196,7 +283,7 @@
func (m *HeartbeatInfo) String() string { return proto.CompactTextString(m) }
func (*HeartbeatInfo) ProtoMessage()    {}
func (*HeartbeatInfo) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{2}
    return fileDescriptor_61abf516b68179fd, []int{3}
}
func (m *HeartbeatInfo) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
@@ -283,7 +370,7 @@
func (m *MsgInfo) String() string { return proto.CompactTextString(m) }
func (*MsgInfo) ProtoMessage()    {}
func (*MsgInfo) Descriptor() ([]byte, []int) {
    return fileDescriptor_61abf516b68179fd, []int{3}
    return fileDescriptor_61abf516b68179fd, []int{4}
}
func (m *MsgInfo) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
@@ -357,6 +444,8 @@
func init() {
    proto.RegisterType((*ProcInfo)(nil), "softbus.ProcInfo")
    proto.RegisterType((*RegisterInfo)(nil), "softbus.RegisterInfo")
    proto.RegisterType((*RegisterInfoReply)(nil), "softbus.RegisterInfoReply")
    proto.RegisterMapType((map[string]int32)(nil), "softbus.RegisterInfoReply.ChannelKeyEntry")
    proto.RegisterType((*HeartbeatInfo)(nil), "softbus.HeartbeatInfo")
    proto.RegisterType((*MsgInfo)(nil), "softbus.MsgInfo")
}
@@ -364,33 +453,40 @@
func init() { proto.RegisterFile("softbus.proto", fileDescriptor_61abf516b68179fd) }
var fileDescriptor_61abf516b68179fd = []byte{
    // 411 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0x41, 0x8e, 0xd3, 0x30,
    0x14, 0x86, 0xf1, 0xa4, 0x69, 0x66, 0x3c, 0xad, 0x54, 0x2c, 0x84, 0x2c, 0x84, 0xa2, 0x2a, 0x62,
    0x81, 0x84, 0xe8, 0x02, 0x6e, 0x80, 0xba, 0x20, 0xa2, 0xa0, 0x2a, 0xf4, 0x02, 0x4e, 0xfa, 0xda,
    0x54, 0x4a, 0xe3, 0xc8, 0x76, 0x8b, 0xc2, 0x25, 0xd8, 0x72, 0x08, 0xc4, 0x1d, 0xd8, 0xb1, 0xe4,
    0x08, 0xa8, 0x5c, 0x04, 0xf9, 0xc5, 0x4e, 0xdb, 0xc5, 0xec, 0xde, 0xf7, 0xff, 0xb1, 0xfc, 0xfe,
    0x3f, 0xa6, 0x63, 0x2d, 0x37, 0x26, 0x3f, 0xe8, 0x59, 0xa3, 0xa4, 0x91, 0x2c, 0x72, 0x98, 0xfc,
    0x24, 0xf4, 0x76, 0xa9, 0x64, 0x91, 0xd6, 0x1b, 0xc9, 0x9e, 0xd1, 0x5b, 0x0d, 0xea, 0x08, 0x2a,
    0x9d, 0x73, 0x32, 0x25, 0x2f, 0xef, 0xb2, 0x9e, 0x19, 0xa7, 0x51, 0x2e, 0x85, 0x5a, 0xa7, 0x73,
    0x7e, 0x83, 0x96, 0xc7, 0x8b, 0x53, 0x4b, 0x1e, 0x5c, 0x9d, 0x5a, 0x5a, 0xaf, 0x51, 0xb2, 0xf8,
    0x24, 0xf6, 0xc0, 0x07, 0x9d, 0xe7, 0x99, 0x3d, 0xa5, 0x43, 0x3b, 0xa7, 0x73, 0x1e, 0xa2, 0xe3,
    0x88, 0x3d, 0xa7, 0x77, 0x76, 0x5a, 0x88, 0x1c, 0x2a, 0x3e, 0x44, 0xeb, 0x2c, 0x24, 0xdf, 0x08,
    0x1d, 0x65, 0xb0, 0xdd, 0x69, 0x03, 0x0a, 0x97, 0x7e, 0xdd, 0x5d, 0x61, 0x67, 0x5c, 0xfa, 0xfe,
    0xcd, 0xe3, 0x99, 0x0f, 0xeb, 0x93, 0x65, 0xfd, 0x27, 0x36, 0x47, 0x51, 0x8a, 0xba, 0x86, 0x8a,
    0xdf, 0x4c, 0x03, 0x9b, 0xc3, 0x21, 0xee, 0x7a, 0xc8, 0x57, 0xb2, 0xd9, 0x15, 0x3c, 0x40, 0xab,
    0x67, 0xcc, 0xe8, 0xbd, 0x41, 0xe7, 0x79, 0x4e, 0x7e, 0x11, 0x3a, 0x7e, 0x0f, 0x42, 0x99, 0x1c,
    0x84, 0xc1, 0x3b, 0xa6, 0xf4, 0xbe, 0x04, 0x51, 0x99, 0x72, 0x01, 0x47, 0xa8, 0x5c, 0x95, 0x97,
    0x12, 0x9b, 0xd0, 0x60, 0xd3, 0x68, 0x6c, 0x32, 0xcc, 0xec, 0x68, 0x6f, 0xf8, 0x22, 0x54, 0x8d,
    0x31, 0x5c, 0x8b, 0x9e, 0x6d, 0x23, 0xa0, 0x94, 0xc4, 0xbc, 0xae, 0xc6, 0xb3, 0x60, 0x5d, 0x69,
    0xca, 0xae, 0x0d, 0xac, 0x72, 0x94, 0x9d, 0x05, 0xf6, 0x82, 0x8e, 0x7b, 0xf8, 0xbc, 0xfb, 0x0a,
    0xd8, 0x68, 0x98, 0x5d, 0x8b, 0xc9, 0x0f, 0x42, 0xa3, 0x8f, 0x7a, 0x8b, 0x27, 0x5e, 0xd1, 0x48,
    0xab, 0xc2, 0x56, 0xf7, 0x70, 0x9f, 0xfe, 0x0b, 0x5b, 0xe7, 0x5e, 0x6f, 0x57, 0x6d, 0x03, 0xfe,
    0x59, 0x38, 0x64, 0x4f, 0x68, 0x68, 0x5c, 0x97, 0x56, 0xef, 0xc0, 0xfe, 0x74, 0x5d, 0xee, 0x3f,
    0x40, 0x8b, 0x39, 0xc2, 0xcc, 0x11, 0x63, 0x74, 0x90, 0xcb, 0x75, 0xeb, 0xf6, 0xc7, 0xb9, 0x7b,
    0x72, 0xeb, 0x76, 0x01, 0xb5, 0x5b, 0xda, 0xe3, 0xbb, 0xc9, 0xef, 0x53, 0x4c, 0xfe, 0x9c, 0x62,
    0xf2, 0xf7, 0x14, 0x93, 0xef, 0xff, 0xe2, 0x47, 0xf9, 0x10, 0xdf, 0xf5, 0xdb, 0xff, 0x01, 0x00,
    0x00, 0xff, 0xff, 0x70, 0x98, 0xf7, 0xf6, 0xe8, 0x02, 0x00, 0x00,
    // 527 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x8a, 0x13, 0x41,
    0x10, 0x76, 0xf2, 0xbb, 0x5b, 0xd9, 0xe8, 0x6e, 0x23, 0x32, 0x2c, 0x12, 0xc2, 0x20, 0x22, 0x8a,
    0x39, 0xac, 0x17, 0x11, 0xbc, 0x68, 0x04, 0xa3, 0x51, 0xc2, 0xb8, 0x2f, 0xd0, 0x93, 0x54, 0x32,
    0xc1, 0xc9, 0xf4, 0xd0, 0xd3, 0x89, 0x8c, 0x2f, 0xe1, 0xd5, 0x87, 0x10, 0xdf, 0xc1, 0x9b, 0x47,
    0x6f, 0x5e, 0x25, 0xbe, 0x88, 0x54, 0x4d, 0xf7, 0x24, 0x59, 0xf1, 0xe0, 0xad, 0xbe, 0xfa, 0xbe,
    0xaa, 0x7c, 0x55, 0x5d, 0x13, 0xe8, 0xe6, 0x6a, 0x6e, 0xa2, 0x75, 0x3e, 0xc8, 0xb4, 0x32, 0x4a,
    0xb4, 0x2d, 0x0c, 0xbe, 0x7a, 0x70, 0x34, 0xd1, 0x6a, 0x3a, 0x4a, 0xe7, 0x4a, 0x9c, 0xc3, 0x51,
    0x8e, 0x7a, 0x83, 0x7a, 0x34, 0xf4, 0xbd, 0xbe, 0x77, 0xef, 0x38, 0xac, 0xb0, 0xf0, 0xa1, 0x1d,
    0x29, 0xa9, 0x67, 0xa3, 0xa1, 0x5f, 0x63, 0xca, 0xc1, 0xbd, 0xaa, 0x89, 0x5f, 0x3f, 0xa8, 0x9a,
    0x10, 0x97, 0x69, 0x35, 0x7d, 0x2b, 0x57, 0xe8, 0x37, 0x4a, 0xce, 0x61, 0x71, 0x0b, 0x5a, 0x14,
    0x8f, 0x86, 0x7e, 0x93, 0x19, 0x8b, 0xc4, 0x6d, 0x38, 0xa6, 0x68, 0x2c, 0x23, 0x4c, 0xfc, 0x16,
    0x53, 0xbb, 0x44, 0xf0, 0xc9, 0x83, 0x93, 0x10, 0x17, 0xcb, 0xdc, 0xa0, 0x66, 0xd3, 0x0f, 0xcb,
    0x9f, 0xa0, 0x98, 0x4d, 0x77, 0x2e, 0xce, 0x06, 0x6e, 0x58, 0x37, 0x59, 0x58, 0x49, 0x68, 0x8e,
    0x69, 0x2c, 0xd3, 0x14, 0x13, 0xbf, 0xd6, 0xaf, 0xd3, 0x1c, 0x16, 0xb2, 0xd7, 0x75, 0x74, 0xa9,
    0xb2, 0xe5, 0xd4, 0xaf, 0x33, 0x55, 0x61, 0x9e, 0xd1, 0x71, 0x8d, 0x92, 0x73, 0x38, 0xf8, 0x59,
    0x83, 0xb3, 0x7d, 0x47, 0x21, 0x66, 0x49, 0xf1, 0xbf, 0xb6, 0x5e, 0x01, 0x58, 0x1f, 0xaf, 0xb1,
    0x60, 0x67, 0x9d, 0x8b, 0xfb, 0x55, 0xc1, 0x5f, 0xed, 0x07, 0xcf, 0x2b, 0xf1, 0x8b, 0xd4, 0xe8,
    0x22, 0xdc, 0xab, 0x16, 0x01, 0x9c, 0xc4, 0x28, 0xb5, 0x89, 0x50, 0x1a, 0xea, 0x46, 0x8f, 0xd2,
    0x0c, 0x0f, 0x72, 0xa2, 0x0f, 0x1d, 0x37, 0x00, 0x49, 0x1a, 0x2c, 0xd9, 0x4f, 0x89, 0xbb, 0x70,
    0x7d, 0x9d, 0xcd, 0xa4, 0xc1, 0x4a, 0xd4, 0x64, 0xd1, 0x95, 0x2c, 0xad, 0x46, 0x93, 0x25, 0x52,
    0xb4, 0x58, 0x51, 0xe1, 0xf3, 0xa7, 0x70, 0xe3, 0x8a, 0x51, 0x71, 0x0a, 0xf5, 0xf7, 0x58, 0xd8,
    0xf3, 0xa2, 0x50, 0xdc, 0x84, 0xe6, 0x46, 0x26, 0x6b, 0xe4, 0xbb, 0x6a, 0x86, 0x25, 0x78, 0x52,
    0x7b, 0xec, 0x05, 0xdf, 0x3c, 0xe8, 0xbe, 0x74, 0xae, 0x79, 0x4d, 0x7d, 0xe8, 0xc4, 0x28, 0x13,
    0x13, 0x8f, 0x71, 0x83, 0x89, 0xed, 0xb2, 0x9f, 0xa2, 0xfe, 0xf3, 0x2c, 0xb7, 0xbd, 0x28, 0x24,
    0x83, 0x1f, 0xa4, 0x4e, 0xf9, 0x25, 0xec, 0x7d, 0x3a, 0x4c, 0xb7, 0x86, 0x5a, 0x2b, 0x5e, 0xac,
    0x3d, 0xd0, 0x5d, 0x82, 0x58, 0x65, 0xe2, 0x72, 0xed, 0x3c, 0xfd, 0x49, 0xb8, 0x4b, 0x88, 0x3b,
    0xd0, 0xad, 0xc0, 0xbb, 0xe5, 0x47, 0xb4, 0xd3, 0x1f, 0x26, 0x83, 0x2f, 0x1e, 0xb4, 0xdf, 0xe4,
    0x0b, 0xae, 0x78, 0x00, 0xed, 0x5c, 0x4f, 0xe9, 0xf5, 0xff, 0x7d, 0x12, 0x4e, 0x41, 0x87, 0xba,
    0xca, 0x17, 0x97, 0x45, 0x86, 0xee, 0x83, 0xb3, 0x90, 0x16, 0x66, 0xec, 0x95, 0x52, 0xbe, 0x04,
    0xf4, 0x39, 0xe5, 0xf1, 0x6a, 0xf7, 0x98, 0x16, 0x09, 0x01, 0x8d, 0x48, 0xcd, 0x0a, 0xeb, 0x9f,
    0xe3, 0xf2, 0x63, 0x9e, 0x15, 0x63, 0x4c, 0xad, 0x69, 0x07, 0x9f, 0x9d, 0x7e, 0xdf, 0xf6, 0xbc,
    0x1f, 0xdb, 0x9e, 0xf7, 0x6b, 0xdb, 0xf3, 0x3e, 0xff, 0xee, 0x5d, 0x8b, 0x5a, 0xfc, 0x8f, 0xf1,
    0xe8, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0d, 0x4f, 0x6d, 0xbc, 0x42, 0x04, 0x00, 0x00,
}
func (m *ProcInfo) Marshal() (dAtA []byte, err error) {
@@ -526,6 +622,73 @@
    return i, nil
}
func (m *RegisterInfoReply) Marshal() (dAtA []byte, err error) {
    size := m.Size()
    dAtA = make([]byte, size)
    n, err := m.MarshalTo(dAtA)
    if err != nil {
        return nil, err
    }
    return dAtA[:n], nil
}
func (m *RegisterInfoReply) MarshalTo(dAtA []byte) (int, error) {
    var i int
    _ = i
    var l int
    _ = l
    if m.ProcInfo != nil {
        dAtA[i] = 0xa
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.ProcInfo.Size()))
        n2, err2 := m.ProcInfo.MarshalTo(dAtA[i:])
        if err2 != nil {
            return 0, err2
        }
        i += n2
    }
    if len(m.ChannelKey) > 0 {
        for k, _ := range m.ChannelKey {
            dAtA[i] = 0x12
            i++
            v := m.ChannelKey[k]
            mapSize := 1 + len(k) + sovSoftbus(uint64(len(k))) + 1 + sovSoftbus(uint64(v))
            i = encodeVarintSoftbus(dAtA, i, uint64(mapSize))
            dAtA[i] = 0xa
            i++
            i = encodeVarintSoftbus(dAtA, i, uint64(len(k)))
            i += copy(dAtA[i:], k)
            dAtA[i] = 0x10
            i++
            i = encodeVarintSoftbus(dAtA, i, uint64(v))
        }
    }
    if m.HeartbeatKey != 0 {
        dAtA[i] = 0x18
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.HeartbeatKey))
    }
    if m.SubTopicKey != 0 {
        dAtA[i] = 0x20
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.SubTopicKey))
    }
    if m.UpdateTopicKey != 0 {
        dAtA[i] = 0x28
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.UpdateTopicKey))
    }
    if m.ReplyKey != 0 {
        dAtA[i] = 0x30
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.ReplyKey))
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
    }
    return i, nil
}
func (m *HeartbeatInfo) Marshal() (dAtA []byte, err error) {
    size := m.Size()
    dAtA = make([]byte, size)
@@ -600,11 +763,11 @@
        dAtA[i] = 0xa
        i++
        i = encodeVarintSoftbus(dAtA, i, uint64(m.SrcProc.Size()))
        n2, err2 := m.SrcProc.MarshalTo(dAtA[i:])
        if err2 != nil {
            return 0, err2
        n3, err3 := m.SrcProc.MarshalTo(dAtA[i:])
        if err3 != nil {
            return 0, err3
        }
        i += n2
        i += n3
    }
    if len(m.MsgType) > 0 {
        dAtA[i] = 0x12
@@ -712,6 +875,42 @@
            l = len(s)
            n += 1 + l + sovSoftbus(uint64(l))
        }
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
    }
    return n
}
func (m *RegisterInfoReply) Size() (n int) {
    if m == nil {
        return 0
    }
    var l int
    _ = l
    if m.ProcInfo != nil {
        l = m.ProcInfo.Size()
        n += 1 + l + sovSoftbus(uint64(l))
    }
    if len(m.ChannelKey) > 0 {
        for k, v := range m.ChannelKey {
            _ = k
            _ = v
            mapEntrySize := 1 + len(k) + sovSoftbus(uint64(len(k))) + 1 + sovSoftbus(uint64(v))
            n += mapEntrySize + 1 + sovSoftbus(uint64(mapEntrySize))
        }
    }
    if m.HeartbeatKey != 0 {
        n += 1 + sovSoftbus(uint64(m.HeartbeatKey))
    }
    if m.SubTopicKey != 0 {
        n += 1 + sovSoftbus(uint64(m.SubTopicKey))
    }
    if m.UpdateTopicKey != 0 {
        n += 1 + sovSoftbus(uint64(m.UpdateTopicKey))
    }
    if m.ReplyKey != 0 {
        n += 1 + sovSoftbus(uint64(m.ReplyKey))
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
@@ -1232,6 +1431,285 @@
    }
    return nil
}
func (m *RegisterInfoReply) Unmarshal(dAtA []byte) error {
    l := len(dAtA)
    iNdEx := 0
    for iNdEx < l {
        preIndex := iNdEx
        var wire uint64
        for shift := uint(0); ; shift += 7 {
            if shift >= 64 {
                return ErrIntOverflowSoftbus
            }
            if iNdEx >= l {
                return io.ErrUnexpectedEOF
            }
            b := dAtA[iNdEx]
            iNdEx++
            wire |= uint64(b&0x7F) << shift
            if b < 0x80 {
                break
            }
        }
        fieldNum := int32(wire >> 3)
        wireType := int(wire & 0x7)
        if wireType == 4 {
            return fmt.Errorf("proto: RegisterInfoReply: wiretype end group for non-group")
        }
        if fieldNum <= 0 {
            return fmt.Errorf("proto: RegisterInfoReply: illegal tag %d (wire type %d)", fieldNum, wire)
        }
        switch fieldNum {
        case 1:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field ProcInfo", wireType)
            }
            var msglen int
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                msglen |= int(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            if msglen < 0 {
                return ErrInvalidLengthSoftbus
            }
            postIndex := iNdEx + msglen
            if postIndex < 0 {
                return ErrInvalidLengthSoftbus
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            if m.ProcInfo == nil {
                m.ProcInfo = &ProcInfo{}
            }
            if err := m.ProcInfo.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
                return err
            }
            iNdEx = postIndex
        case 2:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field ChannelKey", wireType)
            }
            var msglen int
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                msglen |= int(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            if msglen < 0 {
                return ErrInvalidLengthSoftbus
            }
            postIndex := iNdEx + msglen
            if postIndex < 0 {
                return ErrInvalidLengthSoftbus
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            if m.ChannelKey == nil {
                m.ChannelKey = make(map[string]int32)
            }
            var mapkey string
            var mapvalue int32
            for iNdEx < postIndex {
                entryPreIndex := iNdEx
                var wire uint64
                for shift := uint(0); ; shift += 7 {
                    if shift >= 64 {
                        return ErrIntOverflowSoftbus
                    }
                    if iNdEx >= l {
                        return io.ErrUnexpectedEOF
                    }
                    b := dAtA[iNdEx]
                    iNdEx++
                    wire |= uint64(b&0x7F) << shift
                    if b < 0x80 {
                        break
                    }
                }
                fieldNum := int32(wire >> 3)
                if fieldNum == 1 {
                    var stringLenmapkey uint64
                    for shift := uint(0); ; shift += 7 {
                        if shift >= 64 {
                            return ErrIntOverflowSoftbus
                        }
                        if iNdEx >= l {
                            return io.ErrUnexpectedEOF
                        }
                        b := dAtA[iNdEx]
                        iNdEx++
                        stringLenmapkey |= uint64(b&0x7F) << shift
                        if b < 0x80 {
                            break
                        }
                    }
                    intStringLenmapkey := int(stringLenmapkey)
                    if intStringLenmapkey < 0 {
                        return ErrInvalidLengthSoftbus
                    }
                    postStringIndexmapkey := iNdEx + intStringLenmapkey
                    if postStringIndexmapkey < 0 {
                        return ErrInvalidLengthSoftbus
                    }
                    if postStringIndexmapkey > l {
                        return io.ErrUnexpectedEOF
                    }
                    mapkey = string(dAtA[iNdEx:postStringIndexmapkey])
                    iNdEx = postStringIndexmapkey
                } else if fieldNum == 2 {
                    for shift := uint(0); ; shift += 7 {
                        if shift >= 64 {
                            return ErrIntOverflowSoftbus
                        }
                        if iNdEx >= l {
                            return io.ErrUnexpectedEOF
                        }
                        b := dAtA[iNdEx]
                        iNdEx++
                        mapvalue |= int32(b&0x7F) << shift
                        if b < 0x80 {
                            break
                        }
                    }
                } else {
                    iNdEx = entryPreIndex
                    skippy, err := skipSoftbus(dAtA[iNdEx:])
                    if err != nil {
                        return err
                    }
                    if skippy < 0 {
                        return ErrInvalidLengthSoftbus
                    }
                    if (iNdEx + skippy) > postIndex {
                        return io.ErrUnexpectedEOF
                    }
                    iNdEx += skippy
                }
            }
            m.ChannelKey[mapkey] = mapvalue
            iNdEx = postIndex
        case 3:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field HeartbeatKey", wireType)
            }
            m.HeartbeatKey = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.HeartbeatKey |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        case 4:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field SubTopicKey", wireType)
            }
            m.SubTopicKey = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.SubTopicKey |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        case 5:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field UpdateTopicKey", wireType)
            }
            m.UpdateTopicKey = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.UpdateTopicKey |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        case 6:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field ReplyKey", wireType)
            }
            m.ReplyKey = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowSoftbus
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.ReplyKey |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        default:
            iNdEx = preIndex
            skippy, err := skipSoftbus(dAtA[iNdEx:])
            if err != nil {
                return err
            }
            if skippy < 0 {
                return ErrInvalidLengthSoftbus
            }
            if (iNdEx + skippy) < 0 {
                return ErrInvalidLengthSoftbus
            }
            if (iNdEx + skippy) > l {
                return io.ErrUnexpectedEOF
            }
            m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
            iNdEx += skippy
        }
    }
    if iNdEx > l {
        return io.ErrUnexpectedEOF
    }
    return nil
}
func (m *HeartbeatInfo) Unmarshal(dAtA []byte) error {
    l := len(dAtA)
    iNdEx := 0
softbus.proto
@@ -25,6 +25,7 @@
    int32 heartbeatKey = 3;
    int32 subTopicKey = 4;
    int32 updateTopicKey = 5;
    int32 replyKey = 6;
}
message HeartbeatInfo {