554325746@qq.com
2019-06-26 5fdc5059555e7a4cf533b7bfdb79025f23fa2b6a
sdk/sdk.go
@@ -4,6 +4,7 @@
   "context"
   "errors"
   "fmt"
    "os"
   //   "github.com/long/test/httpclient"
   "github.com/long/test/tasktag"
@@ -21,7 +22,7 @@
)
var SocketManage = make(map[string]SocketContext)
var SdkMap = make(map[string]chan *protomsg.SdkMessage)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
type SocketContext struct {
   Sock    deliver.Deliver
@@ -37,14 +38,14 @@
      fmt.Println()
   }
   SdkMap["es"] = make(chan *protomsg.SdkMessage)
   SdkMap["es"] = make(chan protomsg.SdkMessage)
   fmt.Println("create es channel:  ")
   go es(SdkMap["es"])
   go AutoDelSdk(util.Sdkflag)
}
func CreatesdkTopicandServer(sdkid string) {
   SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
   SdkMap[sdkid] = make(chan protomsg.SdkMessage)
   fmt.Println("create sdk channel:  ", sdkid)
   url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
@@ -77,7 +78,7 @@
}
//单独处理   es 主题的情况
func es(sdkmsgchan chan *protomsg.SdkMessage) {
func es(sdkmsgchan chan protomsg.SdkMessage) {
   for _ = range sdkmsgchan {
      fmt.Println("this data is finish all sdk! ")
   }
@@ -110,14 +111,10 @@
//主题
//sdk数据 加工器
func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
   var sdkmsg = &protomsg.SdkMessage{}
func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
   var sdkmsg = protomsg.SdkMessage{}
   sdkmsg.Cid = cid
   //if _, ok := tasktag.TaskMapLab[taskid]; !ok {
   //   sdkmsg.Tasklab = nil
   //   return sdkmsg
   //}
    sdkmsg.Caddr =caddr
   if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
      sdkmsg.Tasklab = nil
      return sdkmsg
@@ -132,13 +129,13 @@
}
//sdk数据分发器
func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
   if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
      sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
   if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
      sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid
   } else {
      sdksend = "es"
   }
   fmt.Println("分发的主题是: ", sdksend)
   fmt.Printf("分发的主题是:%s 位置 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
   return
}
@@ -179,7 +176,7 @@
func Recv(socket SocketContext) {
   var repsdkmsg = &protomsg.SdkMessage{}
   var repsdkmsg = protomsg.SdkMessage{}
   for {
      select {
      case <-socket.Context.Done():
@@ -190,12 +187,14 @@
            //fmt.Printf("%s ", err)
            continue
         } else {
            err = proto.Unmarshal(msg, repsdkmsg)
            err = proto.Unmarshal(msg, &repsdkmsg)
            fmt.Println("receive len: ", len(msg))
            if err != nil {
               fmt.Println("unmarshal error: ", err)
                    os.Exit(1)
               continue
            }
                repsdkmsg.Tasklab.Index++
            //调用计算函数, 分发给下一个主题
            nexttopic := SdkSendTopic(repsdkmsg)
            SdkMap[nexttopic] <- repsdkmsg
@@ -204,18 +203,18 @@
   }
}
func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
   var v *protomsg.SdkMessage
   var ok bool
func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
//   var v *protomsg.SdkMessage
//   var ok bool
   for {
      select {
      case <-socket.Context.Done():
         fmt.Println("socket is close")
         return
      case v, ok = <-in:
        case v, ok := <-in:
         if ok {
            data, err := proto.Marshal(v)
            data, err :=v.Marshal()
            if err != nil {
               fmt.Println("proto marshal error ", err)
               continue