龙赣华
2019-05-14 f8beddf554ab00dafa95fbd3cecd2c05e5f0e72f
change json to photobuf
6个文件已修改
3个文件已添加
892 ■■■■■ 已修改文件
camera/camera.go 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
protomsg/.gitignore 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
protomsg/test.pb.go 738 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
protomsg/test.proto 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 78 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -37,11 +37,14 @@
func Taskdolist(cid string, taskid string, data []byte) {
    fmt.Println("======================================")
    fmt.Println()
    //  数据加工(打标签)
    sdkmsg := sdk.SdkData(cid, taskid, data)
    fmt.Println("============================")
    fmt.Println("sdk  打标签: ", cid, taskid, len(data))
    if sdkmsg.Tasklab == nil {
        fmt.Println("cid:%s 没有任务%s", cid, taskid)
        return
    }
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
@@ -82,7 +85,6 @@
            i++
        }
    }
}
func Recv(cameraid string, socket SocketContext) {
@@ -100,7 +102,7 @@
                continue
            } else {
                for _, taskid := range GetAlltask(cameraid) {
                    Taskdolist(cameraid, taskid, msg)
                    go Taskdolist(cameraid, taskid, msg)
                    fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid)
                }
            }
go.mod
@@ -4,6 +4,7 @@
require (
    github.com/Microsoft/go-winio v0.4.12 // indirect
    github.com/golang/protobuf v1.3.1
    golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -1,5 +1,7 @@
github.com/Microsoft/go-winio v0.4.12 h1:xAfWHN1IrQ0NJ9TBC0KBZoqLjzDTr1ML+4MywiUOryc=
github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
protomsg/.gitignore
New file
@@ -0,0 +1,23 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
protomsg/test.pb.go
New file
@@ -0,0 +1,738 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: test.proto
package protomsg
import (
    fmt "fmt"
    io "io"
    math "math"
    proto "github.com/golang/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type TaskLabel struct {
    Taskid               string   `protobuf:"bytes,1,opt,name=taskid,proto3" json:"taskid,omitempty"`
    Sdkids               []string `protobuf:"bytes,2,rep,name=sdkids,proto3" json:"sdkids,omitempty"`
    Index                int32    `protobuf:"varint,3,opt,name=index,proto3" json:"index,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}
func (m *TaskLabel) Reset()         { *m = TaskLabel{} }
func (m *TaskLabel) String() string { return proto.CompactTextString(m) }
func (*TaskLabel) ProtoMessage()    {}
func (*TaskLabel) Descriptor() ([]byte, []int) {
    return fileDescriptor_c161fcfdc0c3ff1e, []int{0}
}
func (m *TaskLabel) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
}
func (m *TaskLabel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    if deterministic {
        return xxx_messageInfo_TaskLabel.Marshal(b, m, deterministic)
    } else {
        b = b[:cap(b)]
        n, err := m.MarshalTo(b)
        if err != nil {
            return nil, err
        }
        return b[:n], nil
    }
}
func (m *TaskLabel) XXX_Merge(src proto.Message) {
    xxx_messageInfo_TaskLabel.Merge(m, src)
}
func (m *TaskLabel) XXX_Size() int {
    return m.Size()
}
func (m *TaskLabel) XXX_DiscardUnknown() {
    xxx_messageInfo_TaskLabel.DiscardUnknown(m)
}
var xxx_messageInfo_TaskLabel proto.InternalMessageInfo
func (m *TaskLabel) GetTaskid() string {
    if m != nil {
        return m.Taskid
    }
    return ""
}
func (m *TaskLabel) GetSdkids() []string {
    if m != nil {
        return m.Sdkids
    }
    return nil
}
func (m *TaskLabel) GetIndex() int32 {
    if m != nil {
        return m.Index
    }
    return 0
}
type SdkMessage struct {
    Cid                  string     `protobuf:"bytes,1,opt,name=cid,proto3" json:"cid,omitempty"`
    Tasklab              *TaskLabel `protobuf:"bytes,2,opt,name=tasklab,proto3" json:"tasklab,omitempty"`
    Data                 []byte     `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
    XXX_NoUnkeyedLiteral struct{}   `json:"-"`
    XXX_unrecognized     []byte     `json:"-"`
    XXX_sizecache        int32      `json:"-"`
}
func (m *SdkMessage) Reset()         { *m = SdkMessage{} }
func (m *SdkMessage) String() string { return proto.CompactTextString(m) }
func (*SdkMessage) ProtoMessage()    {}
func (*SdkMessage) Descriptor() ([]byte, []int) {
    return fileDescriptor_c161fcfdc0c3ff1e, []int{1}
}
func (m *SdkMessage) XXX_Unmarshal(b []byte) error {
    return m.Unmarshal(b)
}
func (m *SdkMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    if deterministic {
        return xxx_messageInfo_SdkMessage.Marshal(b, m, deterministic)
    } else {
        b = b[:cap(b)]
        n, err := m.MarshalTo(b)
        if err != nil {
            return nil, err
        }
        return b[:n], nil
    }
}
func (m *SdkMessage) XXX_Merge(src proto.Message) {
    xxx_messageInfo_SdkMessage.Merge(m, src)
}
func (m *SdkMessage) XXX_Size() int {
    return m.Size()
}
func (m *SdkMessage) XXX_DiscardUnknown() {
    xxx_messageInfo_SdkMessage.DiscardUnknown(m)
}
var xxx_messageInfo_SdkMessage proto.InternalMessageInfo
func (m *SdkMessage) GetCid() string {
    if m != nil {
        return m.Cid
    }
    return ""
}
func (m *SdkMessage) GetTasklab() *TaskLabel {
    if m != nil {
        return m.Tasklab
    }
    return nil
}
func (m *SdkMessage) GetData() []byte {
    if m != nil {
        return m.Data
    }
    return nil
}
func init() {
    proto.RegisterType((*TaskLabel)(nil), "protomsg.TaskLabel")
    proto.RegisterType((*SdkMessage)(nil), "protomsg.SdkMessage")
}
func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) }
var fileDescriptor_c161fcfdc0c3ff1e = []byte{
    // 195 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
    0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x00, 0x53, 0xb9, 0xc5, 0xe9, 0x4a, 0x81, 0x5c,
    0x9c, 0x21, 0x89, 0xc5, 0xd9, 0x3e, 0x89, 0x49, 0xa9, 0x39, 0x42, 0x62, 0x5c, 0x6c, 0x25, 0x89,
    0xc5, 0xd9, 0x99, 0x29, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x50, 0x1e, 0x48, 0xbc, 0x38,
    0x25, 0x3b, 0x33, 0xa5, 0x58, 0x82, 0x49, 0x81, 0x19, 0x24, 0x0e, 0xe1, 0x09, 0x89, 0x70, 0xb1,
    0x66, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0xb0, 0x06, 0x41, 0x38, 0x4a, 0x89,
    0x5c, 0x5c, 0xc1, 0x29, 0xd9, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x02, 0x5c, 0xcc,
    0xc9, 0x70, 0x03, 0x41, 0x4c, 0x21, 0x5d, 0x2e, 0x76, 0x90, 0xb9, 0x39, 0x89, 0x49, 0x12, 0x4c,
    0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0xc2, 0x7a, 0x30, 0xe7, 0xe8, 0xc1, 0xdd, 0x12, 0x04, 0x53, 0x23,
    0x24, 0xc4, 0xc5, 0x92, 0x92, 0x58, 0x92, 0x08, 0xb6, 0x83, 0x27, 0x08, 0xcc, 0x76, 0x12, 0x38,
    0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63,
    0x48, 0x62, 0x03, 0x1b, 0x61, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x45, 0xc6, 0x2c, 0xfc, 0xe6,
    0x00, 0x00, 0x00,
}
func (m *TaskLabel) Marshal() (dAtA []byte, err error) {
    size := m.Size()
    dAtA = make([]byte, size)
    n, err := m.MarshalTo(dAtA)
    if err != nil {
        return nil, err
    }
    return dAtA[:n], nil
}
func (m *TaskLabel) MarshalTo(dAtA []byte) (int, error) {
    var i int
    _ = i
    var l int
    _ = l
    if len(m.Taskid) > 0 {
        dAtA[i] = 0xa
        i++
        i = encodeVarintTest(dAtA, i, uint64(len(m.Taskid)))
        i += copy(dAtA[i:], m.Taskid)
    }
    if len(m.Sdkids) > 0 {
        for _, s := range m.Sdkids {
            dAtA[i] = 0x12
            i++
            l = len(s)
            for l >= 1<<7 {
                dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
                l >>= 7
                i++
            }
            dAtA[i] = uint8(l)
            i++
            i += copy(dAtA[i:], s)
        }
    }
    if m.Index != 0 {
        dAtA[i] = 0x18
        i++
        i = encodeVarintTest(dAtA, i, uint64(m.Index))
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
    }
    return i, nil
}
func (m *SdkMessage) Marshal() (dAtA []byte, err error) {
    size := m.Size()
    dAtA = make([]byte, size)
    n, err := m.MarshalTo(dAtA)
    if err != nil {
        return nil, err
    }
    return dAtA[:n], nil
}
func (m *SdkMessage) MarshalTo(dAtA []byte) (int, error) {
    var i int
    _ = i
    var l int
    _ = l
    if len(m.Cid) > 0 {
        dAtA[i] = 0xa
        i++
        i = encodeVarintTest(dAtA, i, uint64(len(m.Cid)))
        i += copy(dAtA[i:], m.Cid)
    }
    if m.Tasklab != nil {
        dAtA[i] = 0x12
        i++
        i = encodeVarintTest(dAtA, i, uint64(m.Tasklab.Size()))
        n1, err1 := m.Tasklab.MarshalTo(dAtA[i:])
        if err1 != nil {
            return 0, err1
        }
        i += n1
    }
    if len(m.Data) > 0 {
        dAtA[i] = 0x1a
        i++
        i = encodeVarintTest(dAtA, i, uint64(len(m.Data)))
        i += copy(dAtA[i:], m.Data)
    }
    if m.XXX_unrecognized != nil {
        i += copy(dAtA[i:], m.XXX_unrecognized)
    }
    return i, nil
}
func encodeVarintTest(dAtA []byte, offset int, v uint64) int {
    for v >= 1<<7 {
        dAtA[offset] = uint8(v&0x7f | 0x80)
        v >>= 7
        offset++
    }
    dAtA[offset] = uint8(v)
    return offset + 1
}
func (m *TaskLabel) Size() (n int) {
    if m == nil {
        return 0
    }
    var l int
    _ = l
    l = len(m.Taskid)
    if l > 0 {
        n += 1 + l + sovTest(uint64(l))
    }
    if len(m.Sdkids) > 0 {
        for _, s := range m.Sdkids {
            l = len(s)
            n += 1 + l + sovTest(uint64(l))
        }
    }
    if m.Index != 0 {
        n += 1 + sovTest(uint64(m.Index))
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
    }
    return n
}
func (m *SdkMessage) Size() (n int) {
    if m == nil {
        return 0
    }
    var l int
    _ = l
    l = len(m.Cid)
    if l > 0 {
        n += 1 + l + sovTest(uint64(l))
    }
    if m.Tasklab != nil {
        l = m.Tasklab.Size()
        n += 1 + l + sovTest(uint64(l))
    }
    l = len(m.Data)
    if l > 0 {
        n += 1 + l + sovTest(uint64(l))
    }
    if m.XXX_unrecognized != nil {
        n += len(m.XXX_unrecognized)
    }
    return n
}
func sovTest(x uint64) (n int) {
    for {
        n++
        x >>= 7
        if x == 0 {
            break
        }
    }
    return n
}
func sozTest(x uint64) (n int) {
    return sovTest(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *TaskLabel) Unmarshal(dAtA []byte) error {
    l := len(dAtA)
    iNdEx := 0
    for iNdEx < l {
        preIndex := iNdEx
        var wire uint64
        for shift := uint(0); ; shift += 7 {
            if shift >= 64 {
                return ErrIntOverflowTest
            }
            if iNdEx >= l {
                return io.ErrUnexpectedEOF
            }
            b := dAtA[iNdEx]
            iNdEx++
            wire |= uint64(b&0x7F) << shift
            if b < 0x80 {
                break
            }
        }
        fieldNum := int32(wire >> 3)
        wireType := int(wire & 0x7)
        if wireType == 4 {
            return fmt.Errorf("proto: TaskLabel: wiretype end group for non-group")
        }
        if fieldNum <= 0 {
            return fmt.Errorf("proto: TaskLabel: illegal tag %d (wire type %d)", fieldNum, wire)
        }
        switch fieldNum {
        case 1:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Taskid", wireType)
            }
            var stringLen uint64
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                stringLen |= uint64(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            intStringLen := int(stringLen)
            if intStringLen < 0 {
                return ErrInvalidLengthTest
            }
            postIndex := iNdEx + intStringLen
            if postIndex < 0 {
                return ErrInvalidLengthTest
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            m.Taskid = string(dAtA[iNdEx:postIndex])
            iNdEx = postIndex
        case 2:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Sdkids", wireType)
            }
            var stringLen uint64
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                stringLen |= uint64(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            intStringLen := int(stringLen)
            if intStringLen < 0 {
                return ErrInvalidLengthTest
            }
            postIndex := iNdEx + intStringLen
            if postIndex < 0 {
                return ErrInvalidLengthTest
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            m.Sdkids = append(m.Sdkids, string(dAtA[iNdEx:postIndex]))
            iNdEx = postIndex
        case 3:
            if wireType != 0 {
                return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
            }
            m.Index = 0
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                m.Index |= int32(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
        default:
            iNdEx = preIndex
            skippy, err := skipTest(dAtA[iNdEx:])
            if err != nil {
                return err
            }
            if skippy < 0 {
                return ErrInvalidLengthTest
            }
            if (iNdEx + skippy) < 0 {
                return ErrInvalidLengthTest
            }
            if (iNdEx + skippy) > l {
                return io.ErrUnexpectedEOF
            }
            m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
            iNdEx += skippy
        }
    }
    if iNdEx > l {
        return io.ErrUnexpectedEOF
    }
    return nil
}
func (m *SdkMessage) Unmarshal(dAtA []byte) error {
    l := len(dAtA)
    iNdEx := 0
    for iNdEx < l {
        preIndex := iNdEx
        var wire uint64
        for shift := uint(0); ; shift += 7 {
            if shift >= 64 {
                return ErrIntOverflowTest
            }
            if iNdEx >= l {
                return io.ErrUnexpectedEOF
            }
            b := dAtA[iNdEx]
            iNdEx++
            wire |= uint64(b&0x7F) << shift
            if b < 0x80 {
                break
            }
        }
        fieldNum := int32(wire >> 3)
        wireType := int(wire & 0x7)
        if wireType == 4 {
            return fmt.Errorf("proto: SdkMessage: wiretype end group for non-group")
        }
        if fieldNum <= 0 {
            return fmt.Errorf("proto: SdkMessage: illegal tag %d (wire type %d)", fieldNum, wire)
        }
        switch fieldNum {
        case 1:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Cid", wireType)
            }
            var stringLen uint64
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                stringLen |= uint64(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            intStringLen := int(stringLen)
            if intStringLen < 0 {
                return ErrInvalidLengthTest
            }
            postIndex := iNdEx + intStringLen
            if postIndex < 0 {
                return ErrInvalidLengthTest
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            m.Cid = string(dAtA[iNdEx:postIndex])
            iNdEx = postIndex
        case 2:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Tasklab", wireType)
            }
            var msglen int
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                msglen |= int(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            if msglen < 0 {
                return ErrInvalidLengthTest
            }
            postIndex := iNdEx + msglen
            if postIndex < 0 {
                return ErrInvalidLengthTest
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            if m.Tasklab == nil {
                m.Tasklab = &TaskLabel{}
            }
            if err := m.Tasklab.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
                return err
            }
            iNdEx = postIndex
        case 3:
            if wireType != 2 {
                return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
            }
            var byteLen int
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                byteLen |= int(b&0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            if byteLen < 0 {
                return ErrInvalidLengthTest
            }
            postIndex := iNdEx + byteLen
            if postIndex < 0 {
                return ErrInvalidLengthTest
            }
            if postIndex > l {
                return io.ErrUnexpectedEOF
            }
            m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
            if m.Data == nil {
                m.Data = []byte{}
            }
            iNdEx = postIndex
        default:
            iNdEx = preIndex
            skippy, err := skipTest(dAtA[iNdEx:])
            if err != nil {
                return err
            }
            if skippy < 0 {
                return ErrInvalidLengthTest
            }
            if (iNdEx + skippy) < 0 {
                return ErrInvalidLengthTest
            }
            if (iNdEx + skippy) > l {
                return io.ErrUnexpectedEOF
            }
            m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
            iNdEx += skippy
        }
    }
    if iNdEx > l {
        return io.ErrUnexpectedEOF
    }
    return nil
}
func skipTest(dAtA []byte) (n int, err error) {
    l := len(dAtA)
    iNdEx := 0
    for iNdEx < l {
        var wire uint64
        for shift := uint(0); ; shift += 7 {
            if shift >= 64 {
                return 0, ErrIntOverflowTest
            }
            if iNdEx >= l {
                return 0, io.ErrUnexpectedEOF
            }
            b := dAtA[iNdEx]
            iNdEx++
            wire |= (uint64(b) & 0x7F) << shift
            if b < 0x80 {
                break
            }
        }
        wireType := int(wire & 0x7)
        switch wireType {
        case 0:
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return 0, ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return 0, io.ErrUnexpectedEOF
                }
                iNdEx++
                if dAtA[iNdEx-1] < 0x80 {
                    break
                }
            }
            return iNdEx, nil
        case 1:
            iNdEx += 8
            return iNdEx, nil
        case 2:
            var length int
            for shift := uint(0); ; shift += 7 {
                if shift >= 64 {
                    return 0, ErrIntOverflowTest
                }
                if iNdEx >= l {
                    return 0, io.ErrUnexpectedEOF
                }
                b := dAtA[iNdEx]
                iNdEx++
                length |= (int(b) & 0x7F) << shift
                if b < 0x80 {
                    break
                }
            }
            if length < 0 {
                return 0, ErrInvalidLengthTest
            }
            iNdEx += length
            if iNdEx < 0 {
                return 0, ErrInvalidLengthTest
            }
            return iNdEx, nil
        case 3:
            for {
                var innerWire uint64
                var start int = iNdEx
                for shift := uint(0); ; shift += 7 {
                    if shift >= 64 {
                        return 0, ErrIntOverflowTest
                    }
                    if iNdEx >= l {
                        return 0, io.ErrUnexpectedEOF
                    }
                    b := dAtA[iNdEx]
                    iNdEx++
                    innerWire |= (uint64(b) & 0x7F) << shift
                    if b < 0x80 {
                        break
                    }
                }
                innerWireType := int(innerWire & 0x7)
                if innerWireType == 4 {
                    break
                }
                next, err := skipTest(dAtA[start:])
                if err != nil {
                    return 0, err
                }
                iNdEx = start + next
                if iNdEx < 0 {
                    return 0, ErrInvalidLengthTest
                }
            }
            return iNdEx, nil
        case 4:
            return iNdEx, nil
        case 5:
            iNdEx += 4
            return iNdEx, nil
        default:
            return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
        }
    }
    panic("unreachable")
}
var (
    ErrInvalidLengthTest = fmt.Errorf("proto: negative length found during unmarshaling")
    ErrIntOverflowTest   = fmt.Errorf("proto: integer overflow")
)
protomsg/test.proto
New file
@@ -0,0 +1,14 @@
syntax = "proto3";
package protomsg;
message TaskLabel {
    string taskid = 1;
    repeated string sdkids = 2;
    int32 index = 3 ;
}
message SdkMessage {
    string cid = 1;
    TaskLabel tasklab = 2;
    bytes    data =3 ;
}
sdk/sdk.go
@@ -2,17 +2,20 @@
import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    "github.com/long/test/httpclient"
    "github.com/long/test/protomsg"
    "github.com/long/test/tasktag"
    "github.com/long/test/util"
    "time"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pair"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "github.com/golang/protobuf/proto"
)
var SocketManage = make(map[string]SocketContext)
@@ -48,7 +51,7 @@
}
//单独处理   es 主题的情况
func es(sdkmsgchan chan SdkMessage) {
func es(sdkmsgchan chan *protomsg.SdkMessage) {
    for data := range sdkmsgchan {
        fmt.Println("this data is finish all sdk! ", data)
    }
@@ -65,7 +68,7 @@
    for key, op := range sdkChanDel {
        if op == "add" {
            SdkMap[key] = make(chan SdkMessage)
            SdkMap[key] = make(chan *protomsg.SdkMessage)
            fmt.Println("创建主题 sdk: ", key)
        } else {
            close(SdkMap[key])
@@ -76,26 +79,25 @@
}
//主题
var SdkMap = make(map[string]chan SdkMessage)
// 发送给算法进程的结构
type SdkMessage struct {
    Cid     string
    Tasklab tasktag.TaskLabel
    Data    []byte
}
var SdkMap = make(map[string]chan *protomsg.SdkMessage)
//sdk数据 加工器
func SdkData(cid string, taskid string, data []byte) (sdkmsg SdkMessage) {
func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
    var sdkmsg = &protomsg.SdkMessage{}
    sdkmsg.Cid = cid
    if _, ok := tasktag.TaskMapLab[taskid]; !ok {
        sdkmsg.Tasklab = nil
        return sdkmsg
    }
    sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
    sdkmsg.Data = data
    return
    return sdkmsg
}
//sdk数据分发器
func SdkSendTopic(sdkmsg SdkMessage) (sdksend string) {
    if sdkmsg.Tasklab.Index < len(sdkmsg.Tasklab.Sdkids) {
func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
    if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
        sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
    } else {
        sdksend = "es"
@@ -115,11 +117,11 @@
// 创建主题
func SdkCreateTopic(sdklist []string) (err error) {
    for _, sdkid := range sdklist {
        SdkMap[sdkid] = make(chan SdkMessage)
        SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
        fmt.Println("create sdk channel:  ", sdkid)
    }
    SdkMap["es"] = make(chan SdkMessage)
    SdkMap["es"] = make(chan *protomsg.SdkMessage)
    fmt.Println("create es channel:  ")
    return nil
@@ -171,6 +173,8 @@
func Recv(socket SocketContext) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second)
    var repsdkmsg = &protomsg.SdkMessage{}
    for {
        select {
        case <-socket.Context.Done():
@@ -181,45 +185,37 @@
                //fmt.Printf("%s ", err)
                continue
            } else {
                var repsdkmsg SdkMessage
                var reps interface{}
                err = json.Unmarshal(msg, &reps)
                err = proto.Unmarshal(msg, repsdkmsg)
                if err != nil {
                    fmt.Println("unmarshal error: ", err)
                    continue
                }
                switch v := reps.(type) {
                case map[string]interface{}:
                    //调用计算函数, 分发给下一个主题
                    json.Unmarshal(msg, &repsdkmsg)
                    nexttopic := SdkSendTopic(repsdkmsg)
                    SdkMap[nexttopic] <- repsdkmsg
                case string:
                    fmt.Println("this string is: ", v)
                }
                //调用计算函数, 分发给下一个主题
                nexttopic := SdkSendTopic(repsdkmsg)
                SdkMap[nexttopic] <- repsdkmsg
            }
        }
    }
}
func send(sdkid string, socket SocketContext, in chan SdkMessage) {
    var v SdkMessage
    var b []byte
func send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
    var v *protomsg.SdkMessage
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("socket is close")
        case v = <-in:
            b, _ = json.Marshal(v)
            fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(v.Data))
            if err := socket.Sock.Send(b); err != nil {
            data, err := proto.Marshal(v)
            if err != nil {
                fmt.Println("proto marshal error ", err)
            }
            fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(data))
            if err := socket.Sock.Send(data); err != nil {
                fmt.Println("failed send")
            }
            fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(v.Data))
            fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data))
        }
    }
}
tasktag/tasktag.go
@@ -2,31 +2,23 @@
import (
    "fmt"
    "github.com/long/test/protomsg"
)
/* 任务标签生成器
 *  描述: 用于在数据进入到特定的task时, 给这个数据加上任务标签,这样就能知道算法流程了
 */
type TaskLabel struct {
    Taskid string
    Sdkids []string
    Index  int
}
var TaskMapLab = make(map[string]TaskLabel)
var TaskMapLab = make(map[string]*protomsg.TaskLabel)
//
func Init() {
    var tls []TaskLabel
    var tls []protomsg.TaskLabel
    sdk1 := "812b674b-2375-4589-919a-5c1c3278a972"
    sdk2 := "812b674b-2375-4589-919a-5c1c3278a971"
    task1 := TaskLabel{"5b0902ae-b1bd-43c0-816d-0a87f1f859d1", []string{sdk1, sdk2}, 0}
    task1 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d1", Sdkids: []string{sdk1, sdk2}, Index: int32(0)}
    tls = append(tls, task1)
    task2 := TaskLabel{"5b0902ae-b1bd-43c0-816d-0a87f1f859d2", []string{sdk2}, 0}
    task2 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d2", Sdkids: []string{sdk1}, Index: int32(0)}
    tls = append(tls, task2)
    GenTasklab(tls)
@@ -38,8 +30,8 @@
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func GenTasklab(tasklab []TaskLabel) {
func GenTasklab(tasklab []protomsg.TaskLabel) {
    for _, value := range tasklab {
        TaskMapLab[value.Taskid] = value
        TaskMapLab[value.Taskid] = &value
    }
}
test
Binary files differ