From 5fdc5059555e7a4cf533b7bfdb79025f23fa2b6a Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期三, 26 六月 2019 14:53:46 +0800
Subject: [PATCH] task with zhangmeng ok release 2
---
sdk/sdk.go | 43 +++++++++++++++++++++----------------------
1 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/sdk/sdk.go b/sdk/sdk.go
index bd933e7..7a1c554 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
+ "os"
// "github.com/long/test/httpclient"
"github.com/long/test/tasktag"
@@ -21,7 +22,7 @@
)
var SocketManage = make(map[string]SocketContext)
-var SdkMap = make(map[string]chan *protomsg.SdkMessage)
+var SdkMap = make(map[string]chan protomsg.SdkMessage)
type SocketContext struct {
Sock deliver.Deliver
@@ -37,14 +38,14 @@
fmt.Println()
}
- SdkMap["es"] = make(chan *protomsg.SdkMessage)
+ SdkMap["es"] = make(chan protomsg.SdkMessage)
fmt.Println("create es channel: ")
go es(SdkMap["es"])
go AutoDelSdk(util.Sdkflag)
}
func CreatesdkTopicandServer(sdkid string) {
- SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
+ SdkMap[sdkid] = make(chan protomsg.SdkMessage)
fmt.Println("create sdk channel: ", sdkid)
url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
@@ -77,7 +78,7 @@
}
//鍗曠嫭澶勭悊 es 涓婚鐨勬儏鍐�
-func es(sdkmsgchan chan *protomsg.SdkMessage) {
+func es(sdkmsgchan chan protomsg.SdkMessage) {
for _ = range sdkmsgchan {
fmt.Println("this data is finish all sdk! ")
}
@@ -110,14 +111,10 @@
//涓婚
//sdk鏁版嵁 鍔犲伐鍣�
-func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
- var sdkmsg = &protomsg.SdkMessage{}
+func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
+ var sdkmsg = protomsg.SdkMessage{}
sdkmsg.Cid = cid
- //if _, ok := tasktag.TaskMapLab[taskid]; !ok {
- // sdkmsg.Tasklab = nil
- // return sdkmsg
- //}
-
+ sdkmsg.Caddr =caddr
if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
sdkmsg.Tasklab = nil
return sdkmsg
@@ -132,13 +129,13 @@
}
//sdk鏁版嵁鍒嗗彂鍣�
-func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
- if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
- sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
+func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
+ if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
+ sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid
} else {
sdksend = "es"
}
- fmt.Println("鍒嗗彂鐨勪富棰樻槸锛� ", sdksend)
+ fmt.Printf("鍒嗗彂鐨勪富棰樻槸锛�%s 浣嶇疆 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
return
}
@@ -179,7 +176,7 @@
func Recv(socket SocketContext) {
- var repsdkmsg = &protomsg.SdkMessage{}
+ var repsdkmsg = protomsg.SdkMessage{}
for {
select {
case <-socket.Context.Done():
@@ -190,12 +187,14 @@
//fmt.Printf("%s ", err)
continue
} else {
- err = proto.Unmarshal(msg, repsdkmsg)
+ err = proto.Unmarshal(msg, &repsdkmsg)
fmt.Println("receive len: ", len(msg))
if err != nil {
fmt.Println("unmarshal error: ", err)
+ os.Exit(1)
continue
}
+ repsdkmsg.Tasklab.Index++
//璋冪敤璁$畻鍑芥暟锛� 鍒嗗彂缁欎笅涓�涓富棰�
nexttopic := SdkSendTopic(repsdkmsg)
SdkMap[nexttopic] <- repsdkmsg
@@ -204,18 +203,18 @@
}
}
-func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
- var v *protomsg.SdkMessage
- var ok bool
+func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
+// var v *protomsg.SdkMessage
+// var ok bool
for {
select {
case <-socket.Context.Done():
fmt.Println("socket is close")
return
- case v, ok = <-in:
+ case v, ok := <-in:
if ok {
- data, err := proto.Marshal(v)
+ data, err :=v.Marshal()
if err != nil {
fmt.Println("proto marshal error ", err)
continue
--
Gitblit v1.8.0