From f8beddf554ab00dafa95fbd3cecd2c05e5f0e72f Mon Sep 17 00:00:00 2001
From: 龙赣华 <slongertian@gmail.com>
Date: 星期二, 14 五月 2019 13:54:22 +0800
Subject: [PATCH] change json to photobuf
---
protomsg/test.pb.go | 738 +++++++++++++++++++++++++++++++++++++++++++++++++
go.sum | 2
test | 0
tasktag/tasktag.go | 24 -
protomsg/.gitignore | 23 +
protomsg/test.proto | 14
go.mod | 1
sdk/sdk.go | 78 ++--
camera/camera.go | 12
9 files changed, 830 insertions(+), 62 deletions(-)
diff --git a/camera/camera.go b/camera/camera.go
index 8ee4409..7a98f50 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -37,11 +37,14 @@
func Taskdolist(cid string, taskid string, data []byte) {
+ fmt.Println("======================================")
+ fmt.Println()
// 鏁版嵁鍔犲伐(鎵撴爣绛�)
sdkmsg := sdk.SdkData(cid, taskid, data)
- fmt.Println("============================")
-
- fmt.Println("sdk 鎵撴爣绛撅細 ", cid, taskid, len(data))
+ if sdkmsg.Tasklab == nil {
+ fmt.Println("cid:%s 娌℃湁浠诲姟%s", cid, taskid)
+ return
+ }
// 璁$畻鍒嗗彂鐨勪富棰�
SendTopic := sdk.SdkSendTopic(sdkmsg)
@@ -82,7 +85,6 @@
i++
}
}
-
}
func Recv(cameraid string, socket SocketContext) {
@@ -100,7 +102,7 @@
continue
} else {
for _, taskid := range GetAlltask(cameraid) {
- Taskdolist(cameraid, taskid, msg)
+ go Taskdolist(cameraid, taskid, msg)
fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid)
}
}
diff --git a/go.mod b/go.mod
index 30ab66b..5535460 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@
require (
github.com/Microsoft/go-winio v0.4.12 // indirect
+ github.com/golang/protobuf v1.3.1
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
nanomsg.org/go-mangos v1.4.0
)
diff --git a/go.sum b/go.sum
index e8f84b3..45b71ae 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,7 @@
github.com/Microsoft/go-winio v0.4.12 h1:xAfWHN1IrQ0NJ9TBC0KBZoqLjzDTr1ML+4MywiUOryc=
github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
+github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
diff --git a/protomsg/.gitignore b/protomsg/.gitignore
new file mode 100644
index 0000000..8365624
--- /dev/null
+++ b/protomsg/.gitignore
@@ -0,0 +1,23 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
diff --git a/protomsg/test.pb.go b/protomsg/test.pb.go
new file mode 100644
index 0000000..a52baf2
--- /dev/null
+++ b/protomsg/test.pb.go
@@ -0,0 +1,738 @@
+// Code generated by protoc-gen-gogo. DO NOT EDIT.
+// source: test.proto
+
+package protomsg
+
+import (
+ fmt "fmt"
+ io "io"
+ math "math"
+
+ proto "github.com/golang/protobuf/proto"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type TaskLabel struct {
+ Taskid string `protobuf:"bytes,1,opt,name=taskid,proto3" json:"taskid,omitempty"`
+ Sdkids []string `protobuf:"bytes,2,rep,name=sdkids,proto3" json:"sdkids,omitempty"`
+ Index int32 `protobuf:"varint,3,opt,name=index,proto3" json:"index,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *TaskLabel) Reset() { *m = TaskLabel{} }
+func (m *TaskLabel) String() string { return proto.CompactTextString(m) }
+func (*TaskLabel) ProtoMessage() {}
+func (*TaskLabel) Descriptor() ([]byte, []int) {
+ return fileDescriptor_c161fcfdc0c3ff1e, []int{0}
+}
+func (m *TaskLabel) XXX_Unmarshal(b []byte) error {
+ return m.Unmarshal(b)
+}
+func (m *TaskLabel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ if deterministic {
+ return xxx_messageInfo_TaskLabel.Marshal(b, m, deterministic)
+ } else {
+ b = b[:cap(b)]
+ n, err := m.MarshalTo(b)
+ if err != nil {
+ return nil, err
+ }
+ return b[:n], nil
+ }
+}
+func (m *TaskLabel) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_TaskLabel.Merge(m, src)
+}
+func (m *TaskLabel) XXX_Size() int {
+ return m.Size()
+}
+func (m *TaskLabel) XXX_DiscardUnknown() {
+ xxx_messageInfo_TaskLabel.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_TaskLabel proto.InternalMessageInfo
+
+func (m *TaskLabel) GetTaskid() string {
+ if m != nil {
+ return m.Taskid
+ }
+ return ""
+}
+
+func (m *TaskLabel) GetSdkids() []string {
+ if m != nil {
+ return m.Sdkids
+ }
+ return nil
+}
+
+func (m *TaskLabel) GetIndex() int32 {
+ if m != nil {
+ return m.Index
+ }
+ return 0
+}
+
+type SdkMessage struct {
+ Cid string `protobuf:"bytes,1,opt,name=cid,proto3" json:"cid,omitempty"`
+ Tasklab *TaskLabel `protobuf:"bytes,2,opt,name=tasklab,proto3" json:"tasklab,omitempty"`
+ Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *SdkMessage) Reset() { *m = SdkMessage{} }
+func (m *SdkMessage) String() string { return proto.CompactTextString(m) }
+func (*SdkMessage) ProtoMessage() {}
+func (*SdkMessage) Descriptor() ([]byte, []int) {
+ return fileDescriptor_c161fcfdc0c3ff1e, []int{1}
+}
+func (m *SdkMessage) XXX_Unmarshal(b []byte) error {
+ return m.Unmarshal(b)
+}
+func (m *SdkMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ if deterministic {
+ return xxx_messageInfo_SdkMessage.Marshal(b, m, deterministic)
+ } else {
+ b = b[:cap(b)]
+ n, err := m.MarshalTo(b)
+ if err != nil {
+ return nil, err
+ }
+ return b[:n], nil
+ }
+}
+func (m *SdkMessage) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_SdkMessage.Merge(m, src)
+}
+func (m *SdkMessage) XXX_Size() int {
+ return m.Size()
+}
+func (m *SdkMessage) XXX_DiscardUnknown() {
+ xxx_messageInfo_SdkMessage.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_SdkMessage proto.InternalMessageInfo
+
+func (m *SdkMessage) GetCid() string {
+ if m != nil {
+ return m.Cid
+ }
+ return ""
+}
+
+func (m *SdkMessage) GetTasklab() *TaskLabel {
+ if m != nil {
+ return m.Tasklab
+ }
+ return nil
+}
+
+func (m *SdkMessage) GetData() []byte {
+ if m != nil {
+ return m.Data
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*TaskLabel)(nil), "protomsg.TaskLabel")
+ proto.RegisterType((*SdkMessage)(nil), "protomsg.SdkMessage")
+}
+
+func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) }
+
+var fileDescriptor_c161fcfdc0c3ff1e = []byte{
+ // 195 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
+ 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x00, 0x53, 0xb9, 0xc5, 0xe9, 0x4a, 0x81, 0x5c,
+ 0x9c, 0x21, 0x89, 0xc5, 0xd9, 0x3e, 0x89, 0x49, 0xa9, 0x39, 0x42, 0x62, 0x5c, 0x6c, 0x25, 0x89,
+ 0xc5, 0xd9, 0x99, 0x29, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x50, 0x1e, 0x48, 0xbc, 0x38,
+ 0x25, 0x3b, 0x33, 0xa5, 0x58, 0x82, 0x49, 0x81, 0x19, 0x24, 0x0e, 0xe1, 0x09, 0x89, 0x70, 0xb1,
+ 0x66, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0xb0, 0x06, 0x41, 0x38, 0x4a, 0x89,
+ 0x5c, 0x5c, 0xc1, 0x29, 0xd9, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x02, 0x5c, 0xcc,
+ 0xc9, 0x70, 0x03, 0x41, 0x4c, 0x21, 0x5d, 0x2e, 0x76, 0x90, 0xb9, 0x39, 0x89, 0x49, 0x12, 0x4c,
+ 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0xc2, 0x7a, 0x30, 0xe7, 0xe8, 0xc1, 0xdd, 0x12, 0x04, 0x53, 0x23,
+ 0x24, 0xc4, 0xc5, 0x92, 0x92, 0x58, 0x92, 0x08, 0xb6, 0x83, 0x27, 0x08, 0xcc, 0x76, 0x12, 0x38,
+ 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63,
+ 0x48, 0x62, 0x03, 0x1b, 0x61, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x45, 0xc6, 0x2c, 0xfc, 0xe6,
+ 0x00, 0x00, 0x00,
+}
+
+func (m *TaskLabel) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *TaskLabel) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ if len(m.Taskid) > 0 {
+ dAtA[i] = 0xa
+ i++
+ i = encodeVarintTest(dAtA, i, uint64(len(m.Taskid)))
+ i += copy(dAtA[i:], m.Taskid)
+ }
+ if len(m.Sdkids) > 0 {
+ for _, s := range m.Sdkids {
+ dAtA[i] = 0x12
+ i++
+ l = len(s)
+ for l >= 1<<7 {
+ dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
+ l >>= 7
+ i++
+ }
+ dAtA[i] = uint8(l)
+ i++
+ i += copy(dAtA[i:], s)
+ }
+ }
+ if m.Index != 0 {
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintTest(dAtA, i, uint64(m.Index))
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *SdkMessage) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *SdkMessage) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ if len(m.Cid) > 0 {
+ dAtA[i] = 0xa
+ i++
+ i = encodeVarintTest(dAtA, i, uint64(len(m.Cid)))
+ i += copy(dAtA[i:], m.Cid)
+ }
+ if m.Tasklab != nil {
+ dAtA[i] = 0x12
+ i++
+ i = encodeVarintTest(dAtA, i, uint64(m.Tasklab.Size()))
+ n1, err1 := m.Tasklab.MarshalTo(dAtA[i:])
+ if err1 != nil {
+ return 0, err1
+ }
+ i += n1
+ }
+ if len(m.Data) > 0 {
+ dAtA[i] = 0x1a
+ i++
+ i = encodeVarintTest(dAtA, i, uint64(len(m.Data)))
+ i += copy(dAtA[i:], m.Data)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func encodeVarintTest(dAtA []byte, offset int, v uint64) int {
+ for v >= 1<<7 {
+ dAtA[offset] = uint8(v&0x7f | 0x80)
+ v >>= 7
+ offset++
+ }
+ dAtA[offset] = uint8(v)
+ return offset + 1
+}
+func (m *TaskLabel) Size() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ l = len(m.Taskid)
+ if l > 0 {
+ n += 1 + l + sovTest(uint64(l))
+ }
+ if len(m.Sdkids) > 0 {
+ for _, s := range m.Sdkids {
+ l = len(s)
+ n += 1 + l + sovTest(uint64(l))
+ }
+ }
+ if m.Index != 0 {
+ n += 1 + sovTest(uint64(m.Index))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *SdkMessage) Size() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ l = len(m.Cid)
+ if l > 0 {
+ n += 1 + l + sovTest(uint64(l))
+ }
+ if m.Tasklab != nil {
+ l = m.Tasklab.Size()
+ n += 1 + l + sovTest(uint64(l))
+ }
+ l = len(m.Data)
+ if l > 0 {
+ n += 1 + l + sovTest(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func sovTest(x uint64) (n int) {
+ for {
+ n++
+ x >>= 7
+ if x == 0 {
+ break
+ }
+ }
+ return n
+}
+func sozTest(x uint64) (n int) {
+ return sovTest(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *TaskLabel) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: TaskLabel: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: TaskLabel: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Taskid", wireType)
+ }
+ var stringLen uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ stringLen |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ intStringLen := int(stringLen)
+ if intStringLen < 0 {
+ return ErrInvalidLengthTest
+ }
+ postIndex := iNdEx + intStringLen
+ if postIndex < 0 {
+ return ErrInvalidLengthTest
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Taskid = string(dAtA[iNdEx:postIndex])
+ iNdEx = postIndex
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Sdkids", wireType)
+ }
+ var stringLen uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ stringLen |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ intStringLen := int(stringLen)
+ if intStringLen < 0 {
+ return ErrInvalidLengthTest
+ }
+ postIndex := iNdEx + intStringLen
+ if postIndex < 0 {
+ return ErrInvalidLengthTest
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Sdkids = append(m.Sdkids, string(dAtA[iNdEx:postIndex]))
+ iNdEx = postIndex
+ case 3:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
+ }
+ m.Index = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Index |= int32(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipTest(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthTest
+ }
+ if (iNdEx + skippy) < 0 {
+ return ErrInvalidLengthTest
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *SdkMessage) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: SdkMessage: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: SdkMessage: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Cid", wireType)
+ }
+ var stringLen uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ stringLen |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ intStringLen := int(stringLen)
+ if intStringLen < 0 {
+ return ErrInvalidLengthTest
+ }
+ postIndex := iNdEx + intStringLen
+ if postIndex < 0 {
+ return ErrInvalidLengthTest
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Cid = string(dAtA[iNdEx:postIndex])
+ iNdEx = postIndex
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Tasklab", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthTest
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return ErrInvalidLengthTest
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if m.Tasklab == nil {
+ m.Tasklab = &TaskLabel{}
+ }
+ if err := m.Tasklab.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 3:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthTest
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex < 0 {
+ return ErrInvalidLengthTest
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
+ if m.Data == nil {
+ m.Data = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipTest(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthTest
+ }
+ if (iNdEx + skippy) < 0 {
+ return ErrInvalidLengthTest
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func skipTest(dAtA []byte) (n int, err error) {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ wireType := int(wire & 0x7)
+ switch wireType {
+ case 0:
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ iNdEx++
+ if dAtA[iNdEx-1] < 0x80 {
+ break
+ }
+ }
+ return iNdEx, nil
+ case 1:
+ iNdEx += 8
+ return iNdEx, nil
+ case 2:
+ var length int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ length |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if length < 0 {
+ return 0, ErrInvalidLengthTest
+ }
+ iNdEx += length
+ if iNdEx < 0 {
+ return 0, ErrInvalidLengthTest
+ }
+ return iNdEx, nil
+ case 3:
+ for {
+ var innerWire uint64
+ var start int = iNdEx
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowTest
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ innerWire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ innerWireType := int(innerWire & 0x7)
+ if innerWireType == 4 {
+ break
+ }
+ next, err := skipTest(dAtA[start:])
+ if err != nil {
+ return 0, err
+ }
+ iNdEx = start + next
+ if iNdEx < 0 {
+ return 0, ErrInvalidLengthTest
+ }
+ }
+ return iNdEx, nil
+ case 4:
+ return iNdEx, nil
+ case 5:
+ iNdEx += 4
+ return iNdEx, nil
+ default:
+ return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+ }
+ }
+ panic("unreachable")
+}
+
+var (
+ ErrInvalidLengthTest = fmt.Errorf("proto: negative length found during unmarshaling")
+ ErrIntOverflowTest = fmt.Errorf("proto: integer overflow")
+)
diff --git a/protomsg/test.proto b/protomsg/test.proto
new file mode 100644
index 0000000..774a81e
--- /dev/null
+++ b/protomsg/test.proto
@@ -0,0 +1,14 @@
+syntax = "proto3";
+package protomsg;
+
+message TaskLabel {
+ string taskid = 1;
+ repeated string sdkids = 2;
+ int32 index = 3 ;
+}
+
+message SdkMessage {
+ string cid = 1;
+ TaskLabel tasklab = 2;
+ bytes data =3 ;
+}
diff --git a/sdk/sdk.go b/sdk/sdk.go
index fd91af0..b97d132 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -2,17 +2,20 @@
import (
"context"
- "encoding/json"
"fmt"
+ "time"
+
"github.com/long/test/httpclient"
+ "github.com/long/test/protomsg"
"github.com/long/test/tasktag"
"github.com/long/test/util"
- "time"
"nanomsg.org/go-mangos"
"nanomsg.org/go-mangos/protocol/pair"
"nanomsg.org/go-mangos/transport/ipc"
"nanomsg.org/go-mangos/transport/tcp"
+
+ "github.com/golang/protobuf/proto"
)
var SocketManage = make(map[string]SocketContext)
@@ -48,7 +51,7 @@
}
//鍗曠嫭澶勭悊 es 涓婚鐨勬儏鍐�
-func es(sdkmsgchan chan SdkMessage) {
+func es(sdkmsgchan chan *protomsg.SdkMessage) {
for data := range sdkmsgchan {
fmt.Println("this data is finish all sdk! ", data)
}
@@ -65,7 +68,7 @@
for key, op := range sdkChanDel {
if op == "add" {
- SdkMap[key] = make(chan SdkMessage)
+ SdkMap[key] = make(chan *protomsg.SdkMessage)
fmt.Println("鍒涘缓涓婚 sdk: ", key)
} else {
close(SdkMap[key])
@@ -76,26 +79,25 @@
}
//涓婚
-var SdkMap = make(map[string]chan SdkMessage)
-
-// 鍙戦�佺粰绠楁硶杩涚▼鐨勭粨鏋�
-type SdkMessage struct {
- Cid string
- Tasklab tasktag.TaskLabel
- Data []byte
-}
+var SdkMap = make(map[string]chan *protomsg.SdkMessage)
//sdk鏁版嵁 鍔犲伐鍣�
-func SdkData(cid string, taskid string, data []byte) (sdkmsg SdkMessage) {
+func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
+ var sdkmsg = &protomsg.SdkMessage{}
sdkmsg.Cid = cid
+ if _, ok := tasktag.TaskMapLab[taskid]; !ok {
+ sdkmsg.Tasklab = nil
+ return sdkmsg
+ }
+
sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
sdkmsg.Data = data
- return
+ return sdkmsg
}
//sdk鏁版嵁鍒嗗彂鍣�
-func SdkSendTopic(sdkmsg SdkMessage) (sdksend string) {
- if sdkmsg.Tasklab.Index < len(sdkmsg.Tasklab.Sdkids) {
+func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
+ if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
} else {
sdksend = "es"
@@ -115,11 +117,11 @@
// 鍒涘缓涓婚
func SdkCreateTopic(sdklist []string) (err error) {
for _, sdkid := range sdklist {
- SdkMap[sdkid] = make(chan SdkMessage)
+ SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
fmt.Println("create sdk channel: ", sdkid)
}
- SdkMap["es"] = make(chan SdkMessage)
+ SdkMap["es"] = make(chan *protomsg.SdkMessage)
fmt.Println("create es channel: ")
return nil
@@ -171,6 +173,8 @@
func Recv(socket SocketContext) {
socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second)
+
+ var repsdkmsg = &protomsg.SdkMessage{}
for {
select {
case <-socket.Context.Done():
@@ -181,45 +185,37 @@
//fmt.Printf("%s ", err)
continue
} else {
-
- var repsdkmsg SdkMessage
- var reps interface{}
-
- err = json.Unmarshal(msg, &reps)
+ err = proto.Unmarshal(msg, repsdkmsg)
if err != nil {
+ fmt.Println("unmarshal error: ", err)
continue
}
-
- switch v := reps.(type) {
- case map[string]interface{}:
- //璋冪敤璁$畻鍑芥暟锛� 鍒嗗彂缁欎笅涓�涓富棰�
-
- json.Unmarshal(msg, &repsdkmsg)
- nexttopic := SdkSendTopic(repsdkmsg)
- SdkMap[nexttopic] <- repsdkmsg
- case string:
- fmt.Println("this string is: ", v)
- }
+ //璋冪敤璁$畻鍑芥暟锛� 鍒嗗彂缁欎笅涓�涓富棰�
+ nexttopic := SdkSendTopic(repsdkmsg)
+ SdkMap[nexttopic] <- repsdkmsg
}
}
}
}
-func send(sdkid string, socket SocketContext, in chan SdkMessage) {
- var v SdkMessage
- var b []byte
+func send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
+ var v *protomsg.SdkMessage
for {
select {
case <-socket.Context.Done():
fmt.Println("socket is close")
case v = <-in:
- b, _ = json.Marshal(v)
- fmt.Printf("浠庣閬搒dkid=%s 鎺ュ彈鏁版嵁 %d\n", sdkid, len(v.Data))
- if err := socket.Sock.Send(b); err != nil {
+ data, err := proto.Marshal(v)
+ if err != nil {
+ fmt.Println("proto marshal error ", err)
+ }
+ fmt.Printf("浠庣閬搒dkid=%s 鎺ュ彈鏁版嵁 %d\n", sdkid, len(data))
+
+ if err := socket.Sock.Send(data); err != nil {
fmt.Println("failed send")
}
- fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(v.Data))
+ fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data))
}
}
}
diff --git a/tasktag/tasktag.go b/tasktag/tasktag.go
index fe83e45..24fb853 100644
--- a/tasktag/tasktag.go
+++ b/tasktag/tasktag.go
@@ -2,31 +2,23 @@
import (
"fmt"
+
+ "github.com/long/test/protomsg"
)
-/* 浠诲姟鏍囩鐢熸垚鍣�
- * 鎻忚堪锛� 鐢ㄤ簬鍦ㄦ暟鎹繘鍏ュ埌鐗瑰畾鐨則ask鏃讹紝 缁欒繖涓暟鎹姞涓婁换鍔℃爣绛撅紝杩欐牱灏辫兘鐭ラ亾绠楁硶娴佺▼浜�
- */
-
-type TaskLabel struct {
- Taskid string
- Sdkids []string
- Index int
-}
-
-var TaskMapLab = make(map[string]TaskLabel)
+var TaskMapLab = make(map[string]*protomsg.TaskLabel)
//
func Init() {
- var tls []TaskLabel
+ var tls []protomsg.TaskLabel
sdk1 := "812b674b-2375-4589-919a-5c1c3278a972"
sdk2 := "812b674b-2375-4589-919a-5c1c3278a971"
- task1 := TaskLabel{"5b0902ae-b1bd-43c0-816d-0a87f1f859d1", []string{sdk1, sdk2}, 0}
+ task1 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d1", Sdkids: []string{sdk1, sdk2}, Index: int32(0)}
tls = append(tls, task1)
- task2 := TaskLabel{"5b0902ae-b1bd-43c0-816d-0a87f1f859d2", []string{sdk2}, 0}
+ task2 := protomsg.TaskLabel{Taskid: "5b0902ae-b1bd-43c0-816d-0a87f1f859d2", Sdkids: []string{sdk1}, Index: int32(0)}
tls = append(tls, task2)
GenTasklab(tls)
@@ -38,8 +30,8 @@
// 浠巗qlite 鎺ュ彛鎷垮埌鎵�鏈夌殑浠诲姟, 姣忎竴涓换鍔¢兘鏈夎嚜宸辩殑鍑犱釜绠楁硶
//浠� taskid 浣滀负key, 瀵瑰簲鐨勭畻娉曠粍鍚堜綔涓� value
-func GenTasklab(tasklab []TaskLabel) {
+func GenTasklab(tasklab []protomsg.TaskLabel) {
for _, value := range tasklab {
- TaskMapLab[value.Taskid] = value
+ TaskMapLab[value.Taskid] = &value
}
}
diff --git a/test b/test
index aa1a6c5..ad9f9e3 100755
--- a/test
+++ b/test
Binary files differ
--
Gitblit v1.8.0